Делаем карманного аналитика данных с помощью OpenAI Assistants API и Code Interpreter в Telegram

от автора

В прошлый раз мы делали AI-официанта с помощью OpenAI Assistants API и Vector Store в Telegram.

В этот раз воспользуемся другим инструментом Assistants API от OpenAI под названием Code Interpreter. 

Что такое Code Interpreter?

Языковая модель генерирует текст, она не может проводить сколько-нибудь сложные математические вычисления или анализ данных, она просто не предназначена для этого. Однако, модель может генерировать код и очень хорошо. Что, если давать модели задание, для выполнения которого она сгенерирует программный код, он исполнится в изолированной среде разработки, и полученный результат модель уже использует для генерации ответа? Именно эту задачу и выполняет Code Interpreter.

Задача

В этой статье разберем следующий пример: у меня есть простой CSV файл со всеми платежами для фейкового онлайн-магазина. Формат данных следующий:

"id","user_id","amount","created_at" "4675","2251032","1837.00","2024-05-22 07:10:02" "4676","7472836","2849.00","2024-05-22 07:27:45" "4677","6271037","2999.00","2024-05-22 07:33:12" "4678","6815010","2877.00","2024-05-22 08:01:58" "4679","2565937","5000.00","2024-05-22 08:13:17" "4680","7074300","299.00","2024-05-22 08:16:49" "4681","7028029","5770.00","2024-05-22 08:33:42"

Я хочу задавать вопросы, требующие анализа этих данных. Например: сколько заработали в определенном месяце? Есть ли различие в поведении покупателей по дням недели? Результат я хочу получать в виде текста или графиков, где применимо.

Подготовка

Повторяем все шаги из оригинальной статьи. Единственная разница будет в коде для облачной функции.

requirements.txt

В редакторе сначала создадим новый файл, назовём его requirements.txt и положим туда следующий код:

openai~=1.33.0 boto3~=1.34.122 pyTelegramBotAPI~=4.19.1

Это список зависимостей для Python, которые облако автоматически подгрузит во время сборки так, чтобы мы могли пользоваться ими в коде самой функции.

config.py

Теперь создадим файл, в котором будет код, который отвечает за конфигурацию проекта. Назовём его config.py и запишем следующий код:

import json import os  import boto3 import openai  YANDEX_KEY_ID = os.environ.get("YANDEX_KEY_ID") YANDEX_KEY_SECRET = os.environ.get("YANDEX_KEY_SECRET") YANDEX_BUCKET = os.environ.get("YANDEX_BUCKET") PROXY_API_KEY = os.environ.get("PROXY_API_KEY") ASSISTANT_MODEL = os.environ.get("ASSISTANT_MODEL") TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN") TG_BOT_ADMIN = os.environ.get("TG_BOT_ADMIN")   def get_s3_client():     session = boto3.session.Session(         aws_access_key_id=YANDEX_KEY_ID, aws_secret_access_key=YANDEX_KEY_SECRET     )     return session.client(         service_name="s3", endpoint_url="https://storage.yandexcloud.net"     )   def get_config() -> dict:     s3client = get_s3_client()     try:         response = s3client.get_object(Bucket=YANDEX_BUCKET, Key="config.json")         return json.loads(response["Body"].read())     except:         return {}   def save_config(new_config: dict):     s3client = get_s3_client()     s3client.put_object(         Bucket=YANDEX_BUCKET, Key="config.json", Body=json.dumps(new_config)     )   proxy_client = openai.Client(     api_key=PROXY_API_KEY,     base_url="https://api.proxyapi.ru/openai/v1", ) 

Здесь мы читаем все необходимые параметры из переменных окружения и прописываем функции для получения и сохранения дополнительных (динамических) параметров в бакете хранилища Яндекс Облака. Инициируем также клиент для ProxyAPI согласно документации, то есть переопределяем путь к API.

admin.py

Далее создаём файл admin.py и кладём туда весь код, который отвечает за разные административные задачи проекта.

from config import ASSISTANT_MODEL, get_config, proxy_client, save_config   def create_assistant(name, instructions):     assistant_id = get_assistant_id()     if not assistant_id:         new_assistant = proxy_client.beta.assistants.create(             model=ASSISTANT_MODEL,             instructions=instructions,             name=name,             tools=[                 {                     "type": "code_interpreter",                 }             ],         )         config = get_config()         config["assistant_id"] = new_assistant.id         save_config(config)     else:         proxy_client.beta.assistants.update(             assistant_id=assistant_id, instructions=instructions         )   def get_assistant_id():     config = get_config()     return config["assistant_id"] if "assistant_id" in config else None   def upload_file(chat_id, filename, file):     config = get_config()     file_object = proxy_client.files.create(file=(filename, file), purpose="assistants")     thread = proxy_client.beta.threads.create(         messages=[             {                 "role": "user",                 "content": "Для всех вопросов, которые я буду задавать, используй эти данные для анализа",                 "attachments": [                     {"file_id": file_object.id, "tools": [{"type": "code_interpreter"}]}                 ],             }         ]     )     if not "threads" in config:         config["threads"] = {}      config["threads"][chat_id] = thread.id     save_config(config)   def get_thread_id(chat_id: str):     config = get_config()      if "threads" in config and chat_id in config["threads"]:         return config["threads"][chat_id]      return None 

create_assistant

Создаёт или обновляет настройки для ассистента на основе имени и инструкций. Сразу привязываем ассистента к нашему векторному хранилищу. Сохраняем идентификатор ассистента в конфигурации.

get_assistant_id

Возвращает идентификатор ассистента из конфигурации.

upload_file

Метод, который загружает файл в файловое (не векторное!) хранилище API и инициирует новый тред для этого чата. Идея здесь такая: для каждого нового загруженного файла начинаем новый тред, чтобы пользователь мог опрашивать модель именно по данным из этого файла. Старый тред «забывается».

get_thread_id

Возвращает текущий идентификатор треда, если хоть один файл уже был загружен.

chat.py

В файле chat.py (тоже создаём его) будем хранить методы для работы с сообщениями пользователей.

import os  from admin import get_assistant_id, get_thread_id from config import proxy_client   def process_message(chat_id: str, message: str) -> list[dict]:     assistant_id = get_assistant_id()     thread_id = get_thread_id(chat_id)     if not thread_id:         raise ValueError("Thread not found")      proxy_client.beta.threads.messages.create(         thread_id=thread_id, content=message, role="user"     )      run = proxy_client.beta.threads.runs.create_and_poll(         thread_id=thread_id,         assistant_id=assistant_id,     )      answer = []      if run.status == "completed":         messages = proxy_client.beta.threads.messages.list(             thread_id=thread_id, run_id=run.id         )         for message in messages:             if message.role == "assistant":                 for block in message.content:                     if block.type == "text":                         answer.insert(0, {"type": "text", "text": block.text.value})                         if block.text.annotations:                             for annotation in block.text.annotations:                                 if annotation.type == "file_path":                                     answer.insert(                                         0,                                         {                                             "type": "file",                                             "file": download_file(                                                 annotation.file_path.file_id                                             ),                                             "filename": os.path.basename(                                                 annotation.text.split(":")[-1]                                             ),                                         },                                     )                     elif block.type == "image_file":                         answer.insert(                             0,                             {                                 "type": "image",                                 "file": download_file(block.image_file.file_id),                             },                         )      return answer   def download_file(file_id: str) -> str:     file_content = proxy_client.files.content(file_id)     content = file_content.read()     with open(f"/tmp/{file_id}", "wb") as f:         f.write(content)     return f"/tmp/{file_id}" 

process_message

Этот метод принимает сообщения пользователя, и отправляет его через ProxyAPI на обработку в Assistants API. Assistants API работает немного по-другому, в сравнении с обычной API. Здесь после добавления сообщения в тред ответ модели мы сразу не получим. Надо дополнительно запустить обработку всего треда с помощью метода run. Что мы и делаем, после чего получаем ответ, который может состоять из нескольких сообщений.

Code Interpreter, кроме текста, может еще генерировать другие файлы и изображения (например, графики). Так что мы дополнительно распознаем такие случаи с помощью block.type и annotations, загружаем содержимое таких файлов и добавляем в массив сообщений, которые вернул ассистент.

download_file

Загружает файл, сгенерированный ассистентом в локальную временную директорию. Он удалится автоматически, когда облачная функция закончит работу.

index.py

import json import logging import threading import time  import telebot from admin import create_assistant, get_assistant_id, get_thread_id, upload_file from chat import process_message from config import TG_BOT_ADMIN, TG_BOT_TOKEN from telebot.types import InputFile  logger = telebot.logger telebot.logger.setLevel(logging.INFO)  bot = telebot.TeleBot(TG_BOT_TOKEN, threaded=False)   is_typing = False   def start_typing(chat_id):     global is_typing     is_typing = True     typing_thread = threading.Thread(target=typing, args=(chat_id,))     typing_thread.start()   def typing(chat_id):     global is_typing     while is_typing:         bot.send_chat_action(chat_id, "typing")         time.sleep(4)   def stop_typing():     global is_typing     is_typing = False   def check_setup(message):     if not get_assistant_id():         if message.from_user.username != TG_BOT_ADMIN:             bot.send_message(                 message.chat.id, "Бот еще не настроен. Свяжитесь с администратором."             )         else:             bot.send_message(                 message.chat.id,                 "Бот еще не настроен. Используйте команду /create для создания ассистента.",             )         return False     return True   def check_admin(message):     if message.from_user.username != TG_BOT_ADMIN:         bot.send_message(message.chat.id, "Доступ запрещен")         return False     return True   @bot.message_handler(commands=["help", "start"]) def send_welcome(message):     if not check_setup(message):         return      bot.send_message(         message.chat.id,         (             f"Привет! Я твой карманный дата-аналитик. Загрузи файл и задавай вопросы по нему. Я умею проводить анализ данных и строить графики."         ),     )   @bot.message_handler(commands=["create"]) def create_assistant_command(message):     if not check_admin(message):         return      instructions = message.text.split("/create")[1].strip()     if len(instructions) == 0:         bot.send_message(             message.chat.id,             """ Введите подробные инструкции для работы ассистента после команды /create и пробела.  Например:  /create Ты - дата-аналитик. Пользователь загружает файл для анализа, а ты, используя инструменты, отвечаешь на вопросы и, если нужно, строишь графики.  Если ассистент уже был ранее создан, инструкции будут обновлены.             """,         )         return      name = bot.get_me().full_name     create_assistant(name, instructions)      bot.send_message(         message.chat.id,         "Ассистент успешно создан. Теперь вы можете добавлять документы в базу знаний с помощью команды /upload.",     )   @bot.message_handler(commands=["upload"]) def upload_file_command(message):     if not check_setup(message):         return      return bot.send_message(message.chat.id, "Загрузите файл с данными для анализа.")   @bot.message_handler(content_types=["document"]) def upload_file_handler(message):     if not check_setup(message):         return      file_info = bot.get_file(message.document.file_id)     downloaded_file = bot.download_file(file_info.file_path)      try:         upload_file(message.chat.id, message.document.file_name, downloaded_file)     except Exception as e:         return bot.send_message(message.chat.id, f"Ошибка при загрузке файла: {e}")      return bot.send_message(         message.chat.id,         "Файл успешно загружен и новая сессия анализа начата. Задавайте ваши вопросы.",     )   @bot.message_handler(content_types=["text"]) def handle_message(message):     if not check_setup(message):         return      if not get_thread_id(str(message.chat.id)):         return bot.send_message(             message.chat.id,             "Для начала работы загрузите файл с данными для анализа с помощью команды /upload.",         )      start_typing(message.chat.id)      try:         answers = process_message(str(message.chat.id), message.text)     except Exception as e:         bot.send_message(message.chat.id, f"Ошибка при обработке сообщения: {e}")         return      stop_typing()      for answer in answers:         if answer["type"] == "text":             bot.send_message(message.chat.id, answer["text"])         elif answer["type"] == "image":             bot.send_photo(message.chat.id, InputFile(answer["file"]))         elif answer["type"] == "file":             bot.send_document(                 message.chat.id,                 InputFile(answer["file"]),                 visible_file_name=answer["filename"],             )   def handler(event, context):     message = json.loads(event["body"])     update = telebot.types.Update.de_json(message)      if update.message is not None:         try:             bot.process_new_updates([update])         except Exception as e:             print(e)      return {         "statusCode": 200,         "body": "ok",     } 

typing

Вспомогательная функция, которая посылает статус «набирает сообщение…», чтобы наши пользователи не скучали, пока модель готовит ответ.

check_setup

Вспомогательная функция, которая выводит ошибку, если бот ещё не настроен администратором.

check_admin

Вспомогательная функция, которая проверяет, является ли автор сообщения администратором бота.

send_welcome

Посылаем приветственное сообщение после команды /start.

create_assistant_command

Команда /create доступна только администратору, она создаёт или обновляет инструкции для ассистента.

upload_file_command

Команда /upload просто информирует пользователя, что для начала сессии анализа данных надо загрузить файл.

upload_file_handler

При загрузке файла сохраняем его на стороне API и сообщаем пользователю, что новая сессия открыта.

handle_message

Собственно обработчик входящих сообщений от пользователей. Здесь происходит общение с ассистентом.

handler

Точка входа для всей облачной функции. Сюда будут приходить все команды и сообщений от Telegram.


Для удобства я опубликовал весь исходный код функции на GitLab:

https://gitlab.com/evrovas/data-analyst-telegram-bot

В будущем, если будут какие-то обновления, то они будут именно в репозитории.


Вернитесь к оригинальной статье и закончите остальные шаги, начиная с момента «Прописываем точку входа равной index.handler» и до конца, включая заполнение переменных окружения, создание шлюза API и установки Telegram WebHook.

Тест

Начнём с того, что настроим нашего ассистента. Для этого я запущу команду /create от имени администратора:

Отлично, ассистент настроен. Теперь загружу мой CSV файл с данными о продажах.

Теперь, наконец, попробую узнать нужную мне информацию.

Ура! Всё работает!


ссылка на оригинал статьи https://habr.com/ru/articles/829186/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *