В прошлый раз мы делали AI-официанта с помощью OpenAI Assistants API и Vector Store в Telegram.
В этот раз воспользуемся другим инструментом Assistants API от OpenAI под названием Code Interpreter.
Что такое Code Interpreter?
Языковая модель генерирует текст, она не может проводить сколько-нибудь сложные математические вычисления или анализ данных, она просто не предназначена для этого. Однако, модель может генерировать код и очень хорошо. Что, если давать модели задание, для выполнения которого она сгенерирует программный код, он исполнится в изолированной среде разработки, и полученный результат модель уже использует для генерации ответа? Именно эту задачу и выполняет Code Interpreter.
Задача
В этой статье разберем следующий пример: у меня есть простой CSV файл со всеми платежами для фейкового онлайн-магазина. Формат данных следующий:
"id","user_id","amount","created_at" "4675","2251032","1837.00","2024-05-22 07:10:02" "4676","7472836","2849.00","2024-05-22 07:27:45" "4677","6271037","2999.00","2024-05-22 07:33:12" "4678","6815010","2877.00","2024-05-22 08:01:58" "4679","2565937","5000.00","2024-05-22 08:13:17" "4680","7074300","299.00","2024-05-22 08:16:49" "4681","7028029","5770.00","2024-05-22 08:33:42"
Я хочу задавать вопросы, требующие анализа этих данных. Например: сколько заработали в определенном месяце? Есть ли различие в поведении покупателей по дням недели? Результат я хочу получать в виде текста или графиков, где применимо.
Подготовка
Повторяем все шаги из оригинальной статьи. Единственная разница будет в коде для облачной функции.
requirements.txt
В редакторе сначала создадим новый файл, назовём его requirements.txt и положим туда следующий код:
openai~=1.33.0 boto3~=1.34.122 pyTelegramBotAPI~=4.19.1
Это список зависимостей для Python, которые облако автоматически подгрузит во время сборки так, чтобы мы могли пользоваться ими в коде самой функции.
config.py
Теперь создадим файл, в котором будет код, который отвечает за конфигурацию проекта. Назовём его config.py и запишем следующий код:
import json import os import boto3 import openai YANDEX_KEY_ID = os.environ.get("YANDEX_KEY_ID") YANDEX_KEY_SECRET = os.environ.get("YANDEX_KEY_SECRET") YANDEX_BUCKET = os.environ.get("YANDEX_BUCKET") PROXY_API_KEY = os.environ.get("PROXY_API_KEY") ASSISTANT_MODEL = os.environ.get("ASSISTANT_MODEL") TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN") TG_BOT_ADMIN = os.environ.get("TG_BOT_ADMIN") def get_s3_client(): session = boto3.session.Session( aws_access_key_id=YANDEX_KEY_ID, aws_secret_access_key=YANDEX_KEY_SECRET ) return session.client( service_name="s3", endpoint_url="https://storage.yandexcloud.net" ) def get_config() -> dict: s3client = get_s3_client() try: response = s3client.get_object(Bucket=YANDEX_BUCKET, Key="config.json") return json.loads(response["Body"].read()) except: return {} def save_config(new_config: dict): s3client = get_s3_client() s3client.put_object( Bucket=YANDEX_BUCKET, Key="config.json", Body=json.dumps(new_config) ) proxy_client = openai.Client( api_key=PROXY_API_KEY, base_url="https://api.proxyapi.ru/openai/v1", )
Здесь мы читаем все необходимые параметры из переменных окружения и прописываем функции для получения и сохранения дополнительных (динамических) параметров в бакете хранилища Яндекс Облака. Инициируем также клиент для ProxyAPI согласно документации, то есть переопределяем путь к API.
admin.py
Далее создаём файл admin.py и кладём туда весь код, который отвечает за разные административные задачи проекта.
from config import ASSISTANT_MODEL, get_config, proxy_client, save_config def create_assistant(name, instructions): assistant_id = get_assistant_id() if not assistant_id: new_assistant = proxy_client.beta.assistants.create( model=ASSISTANT_MODEL, instructions=instructions, name=name, tools=[ { "type": "code_interpreter", } ], ) config = get_config() config["assistant_id"] = new_assistant.id save_config(config) else: proxy_client.beta.assistants.update( assistant_id=assistant_id, instructions=instructions ) def get_assistant_id(): config = get_config() return config["assistant_id"] if "assistant_id" in config else None def upload_file(chat_id, filename, file): config = get_config() file_object = proxy_client.files.create(file=(filename, file), purpose="assistants") thread = proxy_client.beta.threads.create( messages=[ { "role": "user", "content": "Для всех вопросов, которые я буду задавать, используй эти данные для анализа", "attachments": [ {"file_id": file_object.id, "tools": [{"type": "code_interpreter"}]} ], } ] ) if not "threads" in config: config["threads"] = {} config["threads"][chat_id] = thread.id save_config(config) def get_thread_id(chat_id: str): config = get_config() if "threads" in config and chat_id in config["threads"]: return config["threads"][chat_id] return None
create_assistant
Создаёт или обновляет настройки для ассистента на основе имени и инструкций. Сразу привязываем ассистента к нашему векторному хранилищу. Сохраняем идентификатор ассистента в конфигурации.
get_assistant_id
Возвращает идентификатор ассистента из конфигурации.
upload_file
Метод, который загружает файл в файловое (не векторное!) хранилище API и инициирует новый тред для этого чата. Идея здесь такая: для каждого нового загруженного файла начинаем новый тред, чтобы пользователь мог опрашивать модель именно по данным из этого файла. Старый тред «забывается».
get_thread_id
Возвращает текущий идентификатор треда, если хоть один файл уже был загружен.
chat.py
В файле chat.py (тоже создаём его) будем хранить методы для работы с сообщениями пользователей.
import os from admin import get_assistant_id, get_thread_id from config import proxy_client def process_message(chat_id: str, message: str) -> list[dict]: assistant_id = get_assistant_id() thread_id = get_thread_id(chat_id) if not thread_id: raise ValueError("Thread not found") proxy_client.beta.threads.messages.create( thread_id=thread_id, content=message, role="user" ) run = proxy_client.beta.threads.runs.create_and_poll( thread_id=thread_id, assistant_id=assistant_id, ) answer = [] if run.status == "completed": messages = proxy_client.beta.threads.messages.list( thread_id=thread_id, run_id=run.id ) for message in messages: if message.role == "assistant": for block in message.content: if block.type == "text": answer.insert(0, {"type": "text", "text": block.text.value}) if block.text.annotations: for annotation in block.text.annotations: if annotation.type == "file_path": answer.insert( 0, { "type": "file", "file": download_file( annotation.file_path.file_id ), "filename": os.path.basename( annotation.text.split(":")[-1] ), }, ) elif block.type == "image_file": answer.insert( 0, { "type": "image", "file": download_file(block.image_file.file_id), }, ) return answer def download_file(file_id: str) -> str: file_content = proxy_client.files.content(file_id) content = file_content.read() with open(f"/tmp/{file_id}", "wb") as f: f.write(content) return f"/tmp/{file_id}"
process_message
Этот метод принимает сообщения пользователя, и отправляет его через ProxyAPI на обработку в Assistants API. Assistants API работает немного по-другому, в сравнении с обычной API. Здесь после добавления сообщения в тред ответ модели мы сразу не получим. Надо дополнительно запустить обработку всего треда с помощью метода run. Что мы и делаем, после чего получаем ответ, который может состоять из нескольких сообщений.
Code Interpreter, кроме текста, может еще генерировать другие файлы и изображения (например, графики). Так что мы дополнительно распознаем такие случаи с помощью block.type и annotations, загружаем содержимое таких файлов и добавляем в массив сообщений, которые вернул ассистент.
download_file
Загружает файл, сгенерированный ассистентом в локальную временную директорию. Он удалится автоматически, когда облачная функция закончит работу.
index.py
import json import logging import threading import time import telebot from admin import create_assistant, get_assistant_id, get_thread_id, upload_file from chat import process_message from config import TG_BOT_ADMIN, TG_BOT_TOKEN from telebot.types import InputFile logger = telebot.logger telebot.logger.setLevel(logging.INFO) bot = telebot.TeleBot(TG_BOT_TOKEN, threaded=False) is_typing = False def start_typing(chat_id): global is_typing is_typing = True typing_thread = threading.Thread(target=typing, args=(chat_id,)) typing_thread.start() def typing(chat_id): global is_typing while is_typing: bot.send_chat_action(chat_id, "typing") time.sleep(4) def stop_typing(): global is_typing is_typing = False def check_setup(message): if not get_assistant_id(): if message.from_user.username != TG_BOT_ADMIN: bot.send_message( message.chat.id, "Бот еще не настроен. Свяжитесь с администратором." ) else: bot.send_message( message.chat.id, "Бот еще не настроен. Используйте команду /create для создания ассистента.", ) return False return True def check_admin(message): if message.from_user.username != TG_BOT_ADMIN: bot.send_message(message.chat.id, "Доступ запрещен") return False return True @bot.message_handler(commands=["help", "start"]) def send_welcome(message): if not check_setup(message): return bot.send_message( message.chat.id, ( f"Привет! Я твой карманный дата-аналитик. Загрузи файл и задавай вопросы по нему. Я умею проводить анализ данных и строить графики." ), ) @bot.message_handler(commands=["create"]) def create_assistant_command(message): if not check_admin(message): return instructions = message.text.split("/create")[1].strip() if len(instructions) == 0: bot.send_message( message.chat.id, """ Введите подробные инструкции для работы ассистента после команды /create и пробела. Например: /create Ты - дата-аналитик. Пользователь загружает файл для анализа, а ты, используя инструменты, отвечаешь на вопросы и, если нужно, строишь графики. Если ассистент уже был ранее создан, инструкции будут обновлены. """, ) return name = bot.get_me().full_name create_assistant(name, instructions) bot.send_message( message.chat.id, "Ассистент успешно создан. Теперь вы можете добавлять документы в базу знаний с помощью команды /upload.", ) @bot.message_handler(commands=["upload"]) def upload_file_command(message): if not check_setup(message): return return bot.send_message(message.chat.id, "Загрузите файл с данными для анализа.") @bot.message_handler(content_types=["document"]) def upload_file_handler(message): if not check_setup(message): return file_info = bot.get_file(message.document.file_id) downloaded_file = bot.download_file(file_info.file_path) try: upload_file(message.chat.id, message.document.file_name, downloaded_file) except Exception as e: return bot.send_message(message.chat.id, f"Ошибка при загрузке файла: {e}") return bot.send_message( message.chat.id, "Файл успешно загружен и новая сессия анализа начата. Задавайте ваши вопросы.", ) @bot.message_handler(content_types=["text"]) def handle_message(message): if not check_setup(message): return if not get_thread_id(str(message.chat.id)): return bot.send_message( message.chat.id, "Для начала работы загрузите файл с данными для анализа с помощью команды /upload.", ) start_typing(message.chat.id) try: answers = process_message(str(message.chat.id), message.text) except Exception as e: bot.send_message(message.chat.id, f"Ошибка при обработке сообщения: {e}") return stop_typing() for answer in answers: if answer["type"] == "text": bot.send_message(message.chat.id, answer["text"]) elif answer["type"] == "image": bot.send_photo(message.chat.id, InputFile(answer["file"])) elif answer["type"] == "file": bot.send_document( message.chat.id, InputFile(answer["file"]), visible_file_name=answer["filename"], ) def handler(event, context): message = json.loads(event["body"]) update = telebot.types.Update.de_json(message) if update.message is not None: try: bot.process_new_updates([update]) except Exception as e: print(e) return { "statusCode": 200, "body": "ok", }
typing
Вспомогательная функция, которая посылает статус «набирает сообщение…», чтобы наши пользователи не скучали, пока модель готовит ответ.
check_setup
Вспомогательная функция, которая выводит ошибку, если бот ещё не настроен администратором.
check_admin
Вспомогательная функция, которая проверяет, является ли автор сообщения администратором бота.
send_welcome
Посылаем приветственное сообщение после команды /start.
create_assistant_command
Команда /create доступна только администратору, она создаёт или обновляет инструкции для ассистента.
upload_file_command
Команда /upload просто информирует пользователя, что для начала сессии анализа данных надо загрузить файл.
upload_file_handler
При загрузке файла сохраняем его на стороне API и сообщаем пользователю, что новая сессия открыта.
handle_message
Собственно обработчик входящих сообщений от пользователей. Здесь происходит общение с ассистентом.
handler
Точка входа для всей облачной функции. Сюда будут приходить все команды и сообщений от Telegram.
Для удобства я опубликовал весь исходный код функции на GitLab:
В будущем, если будут какие-то обновления, то они будут именно в репозитории.
Вернитесь к оригинальной статье и закончите остальные шаги, начиная с момента «Прописываем точку входа равной index.handler» и до конца, включая заполнение переменных окружения, создание шлюза API и установки Telegram WebHook.
Тест
Начнём с того, что настроим нашего ассистента. Для этого я запущу команду /create от имени администратора:

Отлично, ассистент настроен. Теперь загружу мой CSV файл с данными о продажах.

Теперь, наконец, попробую узнать нужную мне информацию.




Ура! Всё работает!
ссылка на оригинал статьи https://habr.com/ru/articles/829186/
Добавить комментарий