Всем привет.
Я работаю в IT, и кроме дейликов, общения с окружающими не так уж много. Но иногда бывают мысли после какого-нибудь разговора вне работы, что надо было ответить как-то по‑другому, или почему я промолчал, не знал как ответить, в особенности каверзные или провокационные вопросы. Из этого вырисовывается проблема — как ответить нужное в нужный момент. В моём восприятии это как карате: если удар не натренирован, то он с большой долей вероятности будет плохой. Или другое сравнение: туз никогда не выпадет из рукава, если его туда заранее не положить. Поэтому нужно потренироваться.
Задача: потренироваться, чтобы речь гладко звучала, и не было неловкого смущения, задержек, а всё получалось на автомате. Варианты решения:
-
Потренироваться на домашних, чтобы они задавали токсичные вопросы, рандомно их перебирая. Не мой вариант.
-
Сделать приложение для прогулки, идёшь куда-нибудь, а тебе задаются токсичные вопросы, ты на них отвечаешь после чего валидируется ответ. По результату даются либо подсказки, если ответ некорректный, или задаётся следующий вопрос.
Требования: всё должно быть open-source и после скачивания запускаться локально без внешних сервисов.
Проработка идеи (основная логика): Реализация логики самой идеи — это больше психология: для чего нам нужен open-source психолог. Я нашел одного который говорит
«Все мои материалы в открытом доступе»
Нашёл у него материалы на тему общения с токсичными людьми. По этим двум параметрам идеально подходит Альберт Сафин. Возьмём небольшой список токсичных фраз-крючков, при необходимости его можно будет дополнить, генерируя их через LLM. По теории, пример ответа будет состоять из амортизирующей фразы, фразы-стратегии и фразы-захвата.
Выбираем архитектуру
Хочу микросервисы, так как из этого проекта можно будет сделать другой, переписав конфиги, — а это могут быть идеи для реализации в будущем. Open-source здесь Kubernetes.
Выбираем стек технологий (предварительно):
-
Python — однозначно т. к. это мой любимый язык программирования.
-
Kubernetes нам поможет в микросервисной архитектуре. Его использование необязательно для экспериментов.
-
Linux — ну куда же без него.
Общее описание и проработка микросервисов:
Исходя из варианта 2 решения, нам нужно будет front на телефоне, то это будет kivy. Эх, танцев с бубном при сборке apk не избежать т. к. python плохо приспособлен под android, и есть проблемы с зависимостями при компилировании apk. Front будет запрашивать у бэка рандомную токсичную фразу и воспроизводить ее.
Для того, чтобы нам можно было используя интернет обращаться к домашнему хосту я использовал KeenDns — это бесплатный сервис доменных имён для роутеров Keenetic/Netcrize. В файле params указываем локально или нет будем связываться с хостом. Для доступа используя KeenDns, ставим галочку прямой доступ, выбираем доменное имя для маршрутизации и в пункте Удаленный доступ выбираем «Свободный доступ».
В приложении Kivy tсть 2 экрана и несколько кнопок: начать тренировку, завершить тренировку.
Если есть front, то должен быть и backend — так его и назовём.
Backend должен взять фразу из какого-нибудь источника — базы данных для перевода в аудио. Возьмём postgre, т.к. open-source фраза должна быть сгенерирована на каком-то микросервисе, т.е. текст должен быть переведён в голос Text-to-Speech, так и назовём микросервис TTS.
Использую модель Silero версия v5_5_ru результаты генерации аудио на v3_1_ru были хуже. Модель автоматически загрузится при первом запуске. В коде можно выбрать голос озвучивания, или передавать нужный голос при запросе. Но если постоянно генерировать, то будет большая нагрузка на этот микросервис, а фразы могут повторяться.
Надо будет где-то хранить фразы и выдавать их, чтобы генерировать по требованию. Как хранилище используем Redis т. к. он очень быстро отдаёт по ключу значение. Для уникальности ключей используем UUID.
Логирование я реализовал только в backend микросервисе, т. к. остальные микросервисы слишком простые чтобы туда добавлять ещё и логирования, все ошибки, приходящие от них, и время работы логируется в backend.
@router.get("/task/start")async def start_task(db: Session = Depends(get_session)): """Начать новое задание""" try: start = datetime.now() task_data = get_random_task(db) logger.info(f'create random task {datetime.now() - start}') return task_data except Exception as e: logger.error(f"❌ Ошибка: {e}") raise HTTPException(status_code=500, detail=str(e))@router.post("/task/{task_id}/check")async def check_answer(task_id: str, audio_file: UploadFile = File(...), db: Session = Depends(get_session)): """Проверить ответ пользователя""" try: task_info, task_uuid = get_task_by_id(db, task_id) if not task_info: raise HTTPException(status_code=404, detail=f"Задание {task_id} не найдено") logger.info(f"✅ Задание найдено: {task_info[:50]}...") audio_file = await audio_file.read() try: start = datetime.now() transcribed_text = audio_service.transcribe_bytes(audio_file) # audio_file.read() logger.info(f"Transcirbe action_time: {datetime.now()-start} result:'{transcribed_text}'") start = datetime.now() response = requests.post(f'http://{params.PHRASE_COMPARE_HOST}:{params.PHRASE_COMPARE_PORT}/search_phrases', json={'text': transcribed_text}) phrase_analis_result = response.json() logger.info(f"Phrase compare action_time: {datetime.now()-start} result:\n\t{transcribed_text} \n\t'{phrase_analis_result}'") start = datetime.now() audio_data, text_audio = utils.get_result(phrase_analis_result) logger.info(f"Get result action_time: {datetime.now()-start} result: {text_audio}") return { "task_id": task_uuid, "audio_data": base64.b64encode(audio_data).decode('utf-8'), "phrase_text": text_audio } finally: ... except HTTPException: logger.error(f"❌ Ошибка при проверке ответа: {e} {traceback.print_exc()}") raise except Exception as e: logger.error(f"❌ Ошибка при проверке ответа: {e} {traceback.print_exc()}") raise HTTPException(status_code=500, detail=f"Ошибка проверки: {str(e)} {traceback.print_exc()}")
Front воспроизводит аудио, записывает ответ и отправляет его на backend.
Backend должен принять аудио-сообщение, например, wav и расшифровать его — для расшифровки нужен отдельный микросервис, т.е. переводящий голос в текст если по-английски Speech-to-Text так и назовём STT. Нашёл интересное решение Whisper, и в проекте использовал его форк Faster Whisper что значительно ускорило работу микросервиса. По размеру модели остановил свой выбор на small, т. к. с более маленькими — быстрыми моделями качество было неприемлемым. Из базы нужно будет взять исходную фразу по ключу. Ключом возьмём UUID фразы, чтобы не повторялись. Сделаем UUID ключом для всех видов фраз.
@app.post("/transcribe", response_model=TranscribeResponse)async def transcribe_audio( audio_file: UploadFile = File(...), language: str = "ru", model: str = "small"): """Распознавание аудио из загруженного файла""" try: content = await audio_file.read() file_size = len(content) print(f"📦 Файл загружен в память: {file_size} bytes") result = await _transcribe_bytes(content, language) return result except Exception as e: print(f"❌ Ошибка распознавания: {e}") import traceback traceback.print_exc() raise HTTPException(status_code=500, detail=str(e))async def _transcribe_bytes(audio_bytes: bytes, language: str = "ru") -> dict: """Распознавание аудио из байтов (без сохранения на диск)""" start_time = time.time() model = get_whisper_model() if model is None: raise HTTPException(status_code=503, detail="Модель Faster Whisper недоступна") temp_file = None try: # Определяем расширение по сигнатуре файла suffix = ".wav" if audio_bytes[:4] == b'RIFF': suffix = ".wav" elif audio_bytes[:3] == b'ID3' or audio_bytes[:2] == b'\xff\xfb': suffix = ".mp3" elif audio_bytes[:4] == b'fLaC': suffix = ".flac" elif audio_bytes[:4] == b'OggS': suffix = ".ogg" with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: tmp.write(audio_bytes) temp_file = tmp.name # Настройки транскрибации для Faster Whisper transcribe_options = { "language": language if language != "auto" else None, "task": "transcribe", "beam_size": 5, "best_of": 5, "vad_filter": True, # Фильтр голосовой активности "vad_parameters": dict( min_silence_duration_ms=500, threshold=0.5 ) } # Распознаем через Faster Whisper segments, info = model.transcribe(temp_file, **transcribe_options) # Собираем результат text_parts = [] confidences = [] for segment in segments: text_parts.append(segment.text) if segment.avg_logprob is not None: # Конвертируем logprob в confidence (0-1) confidences.append(round(segment.avg_logprob, 3)) text = " ".join(text_parts).strip() detected_language = info.language duration = time.time() - start_time # Вычисляем среднюю уверенность confidence = None if confidences: confidence = round(sum(confidences) / len(confidences), 3) print(f"✅ Faster Whisper распознал: '{text}'") print(f" Язык: {detected_language}, Уверенность: {confidence}, Время: {duration:.2f}с") print(f" Вероятность языка: {info.language_probability:.3f}") return { "text": text, "status": "success", "engine": "faster_whisper", "model": model_size, "language": detected_language, "confidence": confidence } finally: # Удаляем временный файл if temp_file and os.path.exists(temp_file): try: os.unlink(temp_file) except: pass
Backend должен сопоставить полученную фразу и возможные варианты ответа т. е. нужен микросервис, сопоставляющий фразы, а желательно логику фраз. Как-то читал на habr статью про llm FREEDA. Для русского языка, по-моему, это самый подходящий вариант. Назовём этот микросервис phrases_compare. В phrases_compare для повышения качества сверки, добавлена сверка с неточным поиском посредствам thefuzz в сочетании с Freeda качество сверки фраз начинает быть приемлемым. Есть коэффициенты для настройки прохождения искомых фраз в файле params.
@app.post("/synthesize")async def synthesize(request: TTSRequest): """ Синтез речи из текста через Silero""" print(f"🎵 Запрос синтеза: '{request.text[:50]}...' (speaker: {request.speaker})") if not request.text or not request.text.strip(): raise HTTPException(status_code=400, detail="Текст не может быть пустым") model = get_silero_model() if model is None: raise HTTPException(status_code=503, detail="Модель Silero недоступна") try: speaker = request.speaker if not speaker: raise HTTPException(status_code=503, detail="Нужно выбрать диктора") audio_tensor = model.apply_tts(text=request.text,speaker=speaker, sample_rate=request.sample_rate) audio_np = audio_tensor.cpu().numpy() audio_np = np.clip(audio_np * 32767, -32768, 32767).astype(np.int16) wav_buffer = io.BytesIO() with wave.open(wav_buffer, 'wb') as wav: wav.setnchannels(1) # Моно wav.setsampwidth(2) # 16-bit wav.setframerate(request.sample_rate) wav.writeframes(audio_np.tobytes()) wav_bytes = wav_buffer.getvalue() print(f"✅ Синтез завершен: {len(wav_bytes)} bytes, {len(audio_np)/request.sample_rate:.2f} сек") return Response( content=wav_bytes, media_type="audio/wav", headers={ "Content-Disposition": f"inline; filename=silero_{speaker}.wav", "X-Speaker": speaker, "X-Text-Length": str(len(request.text)), "X-Audio-Duration": str(len(audio_np) / request.sample_rate) } ) except Exception as e: print(f"❌ Ошибка синтеза: {e}") traceback.print_exc() raise HTTPException(status_code=500, detail=f"Ошибка синтеза: {str(e)}")
После сопоставления фраз backend должен отправить либо корректирующую фразу, либо отбивку, что всё ОК — а это тоже фраза. А значит, нам нужна будет ещё таблица в базе для стандартных фраз. Стандартные фразы и минимум фраз для работы приложения загружаются backend, если база пустая. В процессе проработки микросервисов и экспериментов по разделению фраз типа Крючок, Стратегия и Захват. Я добавлял ещё один микросервис между STT и phrases_compare основанный на qwen2.5, его результаты были лучшими по сравнению с ollama моделями gemma llama deepseek с различными числом параметров. В backend я оставил пример экспериментов с qwen2.5 модуль — PhrasSplitClient. Но по итогу лучшие результаты показал микросервис на основе сочетания FREEDA и thefuzz.
@app.post("/search_phrases")async def analyze_toxicity(request: PhraseOneRequest): request = request.model_dump() result = {} result['amortization'] = thefuzz_freeda_compare('amortization', request['text'], model) result['strategy'] = thefuzz_freeda_compare('strategy', request['text'], model) result['capture'] = thefuzz_freeda_compare('capture', request['text'], model) return result
def thefuzz_freeda_compare(find_type, source_text, model): queries = get_concret_query(find_type) matches = process.extract(source_text, queries, limit=1, scorer=fuzz.ratio) good_matches = [m for m in matches if m[1]>params.MINIMAL_FUZZ_SCORE] if not good_matches: return None good_matches = [m[0] for m in good_matches] doc_embedding = model.encode([f"search_document: {source_text}"])[0] query_texts = [f"search_query: {q}" for q in good_matches] query_embeddings = model.encode(query_texts) similarities = np.dot(query_embeddings, doc_embedding) / (np.linalg.norm(query_embeddings, axis=1) * np.linalg.norm(doc_embedding)) if similarities[0] and similarities [0] * 100 > params.MINIMAL_FREEDA_SCORE: # print(f'{find_type} RESULT {good_matches[0]}') return good_matches[0] return None
В итоге к исходному стеку технологий добавляются Postgre и Redis. В процессе отладки ещё прикрутил отправку логов в kafka из основного backend, и контейнеры-сайдкары для отправки логов.
Если захочется мониторить поды, то пригодятся еще Prometheus и grafana, но при запуске в связке minikube kubectl можно подключить аддоны. Т. к. у нас python, а не Java с миллионом параметров конфигурации и удобным подключением метрик через actuator, то вполне достаточно мониторить RAM и CPU, чтобы выбрать оптимальные размеры подов.
-
minikube addons enable metrics-server
-
minikube addons enable dashboard
Для получения списка адднонов: minikube addons list
Итоговый стек технологий
-
Python — все микросервисы. Базой будет FastApi uvicorn, для фронта Kivy.
-
Kubernetes — можно запуститься и без него для экспериментов.
-
Linux — ну кудаже без него.
-
Redis — хранение аудио.
-
Postgre — хранение текстов фраз.
-
Kafka — для хранения логов.
Для запуска проекта первым делом необходимо задать параметры postgre, redis и адрес хоста — они указываются в файлах params.py каждого микросервиса. Кроме этих параметров все остальные имеют значения по умолчанию. А для запуска Kubernetes передаются в ConfigMap находящемся в папке kuber. Для каждого микросервиса создан отдельный файл с необходимым набором абстракций. Файл rbac.yaml необходим для настройки сбора метрик из подов, если это будет необходимо.
apiVersion: v1kind: ConfigMapmetadata: name: backend-config namespace: defaultdata: HOST: "0.0.0.0" PORT: "8000" PHRASE_COMPARE_HOST: '<адрес linux машины в локальной сети>' # PHRASE_COMPARE_HOST: "phrases-compare-service" PHRASE_COMPARE_PORT: "8004" REDIS_HOST: '<адрес linux машины в локальной сети>' REDIS_PORT: "6379" TTS_SERVER: '<адрес linux машины в локальной сети>' # TTS_SERVER: "tts-app-service" TTS_SERVER_PORT: "8001" STT_HOST: '<адрес linux машины в локальной сети>' # STT_HOST: "stt-app-service" STT_PORT: "8002"
Запустить sh файл из папки bash_commands. Предварительно нужно локально скачать модели для STT, TTS, phrase_compare- запустив каждую из них.
sh bash_commands/all_run.sh
Для первого локального запуска для запускаемого микросервиса достаточно установить зависимости.
pip install -r requirements.txt
Я чтобы не загрязнять системный python обычно пользуюсь
-
pyenv pyenv virtualenv 3.11.15 phrase_compare
-
pyenv local phrase_compare
-
pip install -r requirements.txt
И произвести запуск
python main.py или “nohup python main.py &” чтобы не привязываться терминалу
Микросервисы STT и TTS и phrases_compare при первом локальном запуске автоматически скачаются модели, при запуске с использованием k8s, они будут перекопированы в поды.
У каждого микросервиса есть свой dockerfile для сборки контейнера, основная цель которого установить зависимости переложить код и библиотеки для llm.
FROM python:3.13-slimWORKDIR /appCOPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txtCOPY . .EXPOSE 8001CMD ["python", "main.py"]
Запуск Kubernetes и minikube
minikube start —driver=kvm2 —memory=40G —cpus=9 —container-runtime=containerd —disk-size=100g
Пример сборки одного образа в k8s (для сборки каждого образа есть sh файл в папке bash_commands).
-
docker build ‑t tts‑app:latest <Абсолютный путь>/TTS/.
-
minikube image load tts‑app:latest
-
kubectl apply ‑f <Абсолютный путь>/kuber/tts.yaml
Абсолютный путь можно получить командой ‘pwd’, находясь в нужной директории
Микросервисы STT TTS phrases_compare работают на CPU. Хотелось бы поэкспериментировать с NPU от AMD, но пока ollama его не поддерживает. Верисия python для микросревисов 3.12… или 3.11… т. к. с версиями выше не поднимается STT из за особенности библиотек. Все версии зафиксированы в каждом микросервисе в requirements.txt для поднятия в k8s/или локального запуска.
Если нагрузить микросервисы чем-то типа gatling, то скорее всего первым будет отказывать STT, поэтому при необходимости лёгким изменением конфигов k8s можно сделать несколько реплик этого пода.
Репозиторий с исходным кодом в Git
Идеи, которые возможно реализовать, используя базу данного эксперимента:
Как-то на habr читал интересную статью про то, как llm общалась с мошенниками и в максимуме потратила 30 минут на 1 разговор, притворяясь дедом, постоянно путая цифры и рассказывая несвязанные истории, но кода в той статье не было. Что-то подобное можно повторить, если переиспользовать микросервисы STT TTS и ollama.
Считывание текста из аудио-записи в телеграм боте выполнение каких-либо действий на бэкэнде с возвратом аудио, тут удобнее будет через kafka писать в топики и напрямую к ним подключать микросервисы.
Различные переводчики с добавлением микросервиса перевода, или аудио-гид если на front сдалать увязку с GPS и в определённых местах TTS записывать аудио.
Написанию этой статьи послужил душевный порыв, т.к. образованию Python разработчик. В свободное от работы время люблю писать на python, в настоящее время работаю в IT, но должностные обязанности не подразумевают разработки, но душа требует. Спасибо за внимание.
ссылка на оригинал статьи https://habr.com/ru/articles/1042114/