Привет, мир! Меня зовут Павел, я IT инженер и руководитель службы технической поддержки.
Эта статья — вторая часть инструкции по внедрению коннектора WhatsApp и Telegram для Открытых линий CRM Bitrix24. С логикой подключения к Битрикс вы можете ознакомиться в первой части, а в этой статье мы рассмотрим логику обмена сообщениями через WhatsApp.
Обработчик WhatsApp Web
Как бы очевидно это не звучало, но первично нам понадобится обработчик WhatsApp. В ходе долгих поисков адекватного решения, выбор пал на Evolution API в виду неплохой документации относительно аналогов и «живого» комьюнити (хоть и испаноязычного).
Инструкция по развертыванию на сервере есть на официальном сайте — https://doc.evolution-api.com/v1/en/install/docker. Однако я бы рекомендовал использовать образ evoapicloud/evolution-api:v2.3.1, в котором исправлена работа с групповыми чатами. При установке, помимо обязательных переменных, в .env надо указать:
WEBHOOK_GLOBAL_ENABLED=true WEBHOOK_GLOBAL_URL='https://<url_коннектора>/api/waweb/?api-key=XXXX' AUTHENTICATION_API_KEY=YYY CONFIG_SESSION_PHONE_VERSION="2.3000.1025062854"
Модуль коннектора WhatsApp в нашем веб-приложении
Создадим django приложение waweb. Далее реализуем модели в models.py. Модель для хранения данных о сервере Evolution API и его базовой конфигурации:
class Server(models.Model): url = models.URLField(max_length=255, unique=True, verbose_name="Server URL") api_key = models.CharField(max_length=255, verbose_name="API Key") max_connections = models.PositiveIntegerField(default=100) groups_ignore = models.BooleanField(default=True) always_online = models.BooleanField(default=False) read_messages = models.BooleanField(default=False) def __str__(self): return self.url
Модель для хранения данных о сессии WhatsApp:
class Session(models.Model): session = models.UUIDField(default=uuid.uuid4, editable=False, unique=True) server = models.ForeignKey(Server, on_delete=models.SET_NULL, related_name="sessions", null=True, blank=True) apikey = models.CharField(max_length=255, blank=True, null=True) instanceId = models.CharField(max_length=255, blank=True, null=True) date_end = models.DateTimeField(null=True, blank=True) phone = models.CharField(max_length=15, blank=True, null=True) groups_ignore = models.BooleanField(default=True) sms_service = models.BooleanField(default=True) status = models.CharField(max_length=15, blank=True, null=True) owner = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, blank=True) app_instance = models.ForeignKey( AppInstance, on_delete=models.SET_NULL, related_name="wawebs", null=True, blank=True) line = models.ForeignKey( Line, on_delete=models.SET_NULL, blank=True, null=True, related_name="wawebs", ) def __str__(self): return f"Session: {self.session}, Phone: {self.phone or 'Not connected'}"
Приступим к реализации api эндпоинта для обработки событий WhatsApp в файле api/views.py. Реализуем функцию, принимающую события от WhatsApp и формирующую корректный payload для Битрикс, в зависимости от типа сообщения. Таким образом, обработаем сразу сообщения из групп, медиафайлы и голосовые сообщения. Также реализуем загрузку локация через GoogleMap. Для каждого сообщения проверяется связь между Whatsapp сессией и открытой линией Битрикс, а также реализована защита от отправки повторных сообщений путем фильтрации по message_id:
from rest_framework.viewsets import GenericViewSet from rest_framework.response import Response from rest_framework.decorators import action from waweb.models import Session from rest_framework import permissions import requests from django.conf import settings import redis import logging import uuid import re from django.utils import timezone import waweb.tasks as tasks import waweb.utils as utils import bitrix.utils as bitrix_utils import bitrix.tasks as bitrix_tasks redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=6379, db=0) logger = logging.getLogger("django") class EventsHandler(GenericViewSet): def create(self, request, *args, **kwargs): event_data = request.data sessionid = event_data.get('instance') if not sessionid: return Response({'error': 'sessionId is required'}) try: session = Session.objects.get(session=sessionid) except Session.DoesNotExist: return Response({'error': f'Session with sessionId {sessionid} does not exist'}) if not session.owner: return Response({'error': 'Session has no owner'}) event = event_data.get("event") data = event_data.get("data", {}) apikey = event_data.get('apikey') if apikey and session.apikey != apikey: session.apikey = apikey session.save(update_fields=["apikey"]) server = session.server headers = {"apikey": session.apikey} if event == "connection.update": state = data.get('state') if session.status != state: session.status = state session.save(update_fields=["status"]) if state == "open": wuid = data.get("wuid") number = wuid.split("@")[0] session.phone = number session.save(update_fields=["phone"]) if Session.objects.exclude(pk=session.pk).filter(phone=number).exists(): headers = {"apikey": server.api_key} response = requests.delete(f"{server.url}instance/logout/{sessionid}", headers=headers, timeout=60) response = requests.delete(f"{server.url}instance/delete/{sessionid}", headers=headers, timeout=60) session.delete() return Response({'error': 'Phone number already in use, session deleted'}) elif event in ["messages.upsert", "send.message"]: if session.date_end and timezone.now() > session.date_end: return Response({'error': 'tariff has expired'}) message = data.get('message', {}) key_data = data.get('key', {}) message_id = key_data.get('id') if redis_client.exists(f'waweb:{message_id}'): return Response({'message': 'loop message'}) fromme = key_data.get('fromMe') sender = event_data.get('sender').split('@')[0] remoteJid = key_data.get('remoteJid') pushName = data.get("pushName") group_message = False if remoteJid.endswith('@g.us'): group_message = True params = {"groupJid": remoteJid} group_name = requests.get(f"{server.url}group/findGroupInfos/{sessionid}", params=params, headers=headers, timeout=60) if group_name.status_code == 200: pushName = group_name.json().get("subject") file_data = {} remoteJid = remoteJid.split('@')[0] profilepic_url = None if not group_message: profilepic = requests.post(f"{server.url}chat/fetchProfilePictureUrl/{sessionid}", json={"number": remoteJid}, headers=headers, timeout=60) if profilepic.status_code == 200: profilepic = profilepic.json() profilepic_url = profilepic.get("profilePictureUrl") payload = { 'sender': sender, 'remoteJid': remoteJid, 'fromme': fromme, } msg_type = data.get('messageType') fileName = None print(f"Message Recieved: {data}") if msg_type == 'conversation': if group_message: group_sender_name = f"{data.get('pushName')}:" if data.get('pushName') else "" payload.update({'content': f"*{pushName}*\n{group_sender_name} {message.get('conversation')}"}) else: payload.update({'content': message.get('conversation')}) elif msg_type == 'locationMessage': location = message.get(msg_type, {}) latitude = location.get('degreesLatitude') longitude = location.get('degreesLongitude') description = f"{location.get('name')}: {location.get('address')}" body = f"Link: https://www.google.com/maps/place/{latitude},{longitude}" if "None" not in description: body = f"Address: {description} \n {body}" payload.update({'content': body}) elif msg_type == 'contactMessage': payload.update({'content': message.get(msg_type, {}).get("vcard")}) elif msg_type == 'templateMessage': hydratedTemplate = message.get(msg_type, {}).get("hydratedTemplate", {}) hydratedTitleText = hydratedTemplate.get("hydratedTitleText") hydratedContentText = hydratedTemplate.get("hydratedContentText") hydratedFooterText = hydratedTemplate.get("hydratedFooterText") payload.update({'content': f"{hydratedTitleText} \n {hydratedContentText} \n {hydratedFooterText}"}) elif msg_type in ["imageMessage", "documentMessage", "videoMessage", "audioMessage"]: payload.update({'content': message.get(msg_type, {}).get("caption")}) media_url = f"{server.url}chat/getBase64FromMediaMessage/{sessionid}" msg_payload = {"message": {"key": {"id": message_id}}} response = requests.post(media_url, json=msg_payload, headers=headers, timeout=60) if response.status_code == 201: file_data = response.json() file_body = file_data.get('base64') fileName = file_data.get('fileName') mimetype = file_data.get('mimetype') if file_body: from io import BytesIO import base64 file_bytes = base64.b64decode(file_body) file_like = BytesIO(file_bytes) file_like.name = fileName payload.update({'attachments': (file_like.name, file_like, mimetype)}) else: return Response({'message': 'ok'}) try: if session.line: file_url = None text = payload.get("content") or fileName if file_data: domain = session.line.portal.domain chat_key = f'bitrix_chat:{domain}:{session.line.line_id}:{remoteJid}' if redis_client.exists(chat_key): chat_id = redis_client.get(chat_key).decode('utf-8') chat_folder = None try: chat_folder = bitrix_utils.call_method(session.app_instance, "im.disk.folder.get", {"CHAT_ID": chat_id}) if isinstance(chat_folder, dict) and chat_folder.get("error"): logger.error(chat_folder["detail"]) except Exception as e: logger.error(f"Bitrix error: {e}") if chat_folder and "result" in chat_folder: bitrix_utils.call_method(session.app_instance, "imopenlines.session.join", {"CHAT_ID": chat_id}) folder_id = chat_folder.get("result").get("ID") upload_file = bitrix_utils.upload_file( session.app_instance, folder_id, file_body, fileName) if upload_file: file_url = upload_file.get("DOWNLOAD_URL") if fromme: bitrix_tasks.message_add(session.app_instance.id, session.line.line_id, remoteJid, text, session.line.connector.code) if file_url: file_upd = { "CHAT_ID": chat_id, "UPLOAD_ID": upload_file.get("FILE_ID"), "DISK_ID": upload_file.get("ID"), "SILENT_MODE": "Y", "MESSAGE": fileName } bitrix_tasks.call_api(session.app_instance.id, "im.disk.file.commit", file_upd) else: attach = None if file_url: attach = [ { "url": file_url, "name": fileName } ] bitrix_tasks.send_messages(session.app_instance.id, remoteJid, text, session.line.connector.code, session.line.line_id, False, pushName, message_id, attach, profilepic_url) return Response({"message": "message processed"}) except Exception as e: print(f'Failed to send API message: {str(e)}') return Response({'error': f'Failed to send API message: {str(e)}'}, status=500) return Response({'message': 'ok'}) @action(detail=False, methods=['post'], url_path=r'(?P<session>[^/.]+)/send', permission_classes=[permissions.AllowAny]) def send(self, request, session=None, *args, **kwargs): session_id = session if not session_id: return Response({'error': 'session is required'}) try: session = Session.objects.get(session=session_id) except Exception as e: return Response({'error': 'An error occurred', 'details': str(e)}) if session.date_end and timezone.now() > session.date_end: return Response({'error': 'tariff has expired'}, status=402) data = request.data event = data.get('event') message_type = data.get('message_type') attachments = data.get('attachments', {}) if event == "message_created" and message_type == "outgoing": message_id = data.get('id') if redis_client.exists(f'chatwoot:{message_id}'): return Response({'message': 'loop message'}) content = data.get('content') conversation = data.get('conversation', {}) meta = conversation.get('meta', {}) sender = meta.get('sender', {}) phone_number = sender.get('phone_number') if content: wa_resp = utils.send_message(session_id, phone_number, content) if wa_resp.status_code == 201: utils.store_msg(wa_resp) if session.line: cleaned_phone = re.sub(r'\D', '', phone_number) bitrix_tasks.message_add(session.app_instance.id, session.line.line_id, cleaned_phone, content, session.line.connector.code) if attachments: for attachment in attachments: tasks.send_message_task(str(session.session), [phone_number], attachment, 'media') return Response({'message': 'All files sent successfully'}) return Response({'message': f'Session {session_id} authorized'})
Приступим к реализации функциональной части модуля. Создадим файл utils.py.
Предварительно рассмотрим импорты:
import requests import base64 import mimetypes import re import redis import time import magic from django.conf import settings from .models import Session redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=6379, db=0)
Далее реализуем функционал работы с файлами. Функция download_file принимает url отправленного файла и id сообщения. Далее нам нужно получить mimetype файла с помощью библиотеки python-magic, определить его расширение и передать данные в формате base64:
def download_file(attachment): data_url = attachment.get("data_url") or attachment.get("link") message_id = attachment.get("message_id", time.time()) try: response = requests.get(data_url, stream=True, timeout=60) if response.status_code != 200: raise Exception(f"Failed to download file: {response.status_code} {response.text}") file_content = response.content mime = magic.Magic(mime=True) mimetype = mime.from_buffer(file_content) extension = mimetypes.guess_extension(mimetype) or '' filename = f"{message_id}{extension}" base64_encoded_data = base64.b64encode(file_content).decode("utf-8") return { "mimetype": mimetype, "data": base64_encoded_data, "filename": filename } except Exception as e: print(f"Error processing file: {str(e)}") return None
Также для исключения повторных сообщений, настроим сохранение id сообщений в redis:
def store_msg(resp): data = resp.json() msg_data = data.get('key', {}) message_id = msg_data.get('id') if message_id: redis_client.setex(f'waweb:{message_id}', 600, message_id)
Реализуем непосредственно функцию работы с сообщениями. Данная функция очищает телефон от лишних символов (для корректной работы с лидами) и формирует payload в зависимости от типа сообщения:
def send_message(session_id, recipient, content, cont_type="string"): session = Session.objects.get(session=session_id) server = session.server headers = {"apikey": session.apikey} cleaned = re.sub(r'\D', '', recipient) if cont_type == "string": payload = { "number": cleaned, "text": content, "linkPreview": True, } url = f"{server.url}message/sendText/{session_id}" print("Sending WA message by url: ", url) print("WA message payload: ", payload) return requests.post(url, json=payload, headers=headers, timeout=60) elif cont_type == "media": url = f"{server.url}message/sendMedia/{session_id}" mimetype = content.get("mimetype", "") base_type = mimetype.split('/')[0] mediatype = base_type if base_type in ["image"] else "document" payload = { "number": cleaned, "mediatype": mediatype, "mimetype": content.get("mimetype"), "media": content.get("data"), "fileName": content.get("filename") } return requests.post(url, json=payload, headers=headers, timeout=60)
Реализуем celery функции для асинхронной работы приложения:
import requests from django.utils import timezone from django.db.models import Q from datetime import timedelta from celery import shared_task import waweb.utils as utils from django.conf import settings from waweb.models import Session @shared_task def send_message_task(session_id, recipients, content, cont_type="string", from_web=False): if cont_type == "media": content = utils.download_file(content) for recipient in recipients: resp = utils.send_message(session_id, recipient, content, cont_type) if resp.status_code == 201 and not from_web: utils.store_msg(resp) @shared_task def delete_sessions(days=None): now = timezone.now() filters = Q((Q(phone__isnull=True) | Q(phone='')) & Q(date_end__lt=now)) if days is not None: try: days_int = int(days) date_limit = now - timedelta(days=days_int) filters = filters | Q(date_end__lt=date_limit) except (TypeError, ValueError): pass sessions = Session.objects.filter(filters) for session in sessions: server = session.server headers = {"apikey": server.api_key} url = f"{server.url}instance/delete/{session.session}" requests.delete(url, headers=headers, timeout=60) session.delete()
Далее создадим views.py, где опишем функционал подключения WhatsApp к коннектору:
import requests import redis import uuid from requests.exceptions import RequestException from django.shortcuts import render, redirect, get_object_or_404 from django.contrib.auth.decorators import login_required from django.contrib import messages from django.db.models import Count, F from django.utils import timezone from django.conf import settings from bitrix.models import AppInstance, Line, Connector import bitrix.utils as bitrix_utils from users.models import Message from main.decorators import login_message_required from .models import Session, Server from .forms import SendMessageForm from .tasks import send_message_task apps = settings.INSTALLED_APPS redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=6379, db=0, decode_responses=True) LINK_TTL = 60 * 60 * 24
Для начала создадим вьюшку для главной страницы модуля, в которой получим доступные сервера Evolution API и подключенные сессии (если таковые имеются).
@login_message_required(code="waweb") def wa_sessions(request): connector_service = "waweb" connector = Connector.objects.filter(service=connector_service).first() if request.method == "POST": session_id = request.POST.get("session_id") line_id = request.POST.get("line_id") if not line_id: messages.warning(request, "Необходимо выбрать линию из списка или создать новую.") return redirect('waweb') phone = get_object_or_404(Session, id=session_id, owner=request.user) if not phone.phone: messages.error(request, "Сначала необходимо подключить WhatsApp.") return redirect('waweb') if phone.line and str(phone.line.id) == str(line_id): messages.warning(request, "Эта линия уже подключена к выбранной сессии.") return redirect('waweb') try: bitrix_utils.connect_line(request, line_id, phone, connector, connector_service) except Exception as e: messages.error(request, str(e)) return redirect('waweb') return redirect('waweb') sessions = Session.objects.filter(owner=request.user) instances = AppInstance.objects.filter(owner=request.user, app__connectors=connector) wa_lines = Line.objects.filter(connector=connector, owner=request.user) for session in sessions: session.show_link = session.status == "open" return render( request, 'waweb/wa_sessions.html', { "sessions": sessions, "instances": instances, "wa_lines": wa_lines, } )
Далее добавим функцию для подключения новых сессий, в которой получим доступный сервер Evolution API (если их несколько) и вызовем функцию создания сессии:
@login_required def connect_number(request, session_id=None): if not session_id: sessions = Session.objects.filter( phone__isnull=True, owner=request.user ) if sessions and not request.user.integrator: messages.warning(request, "У вас уже есть незавершенное подключение. Нажмите 'Подключить'") return redirect('waweb') new_session = Session.objects.create(owner=request.user) session_id = new_session.session else: new_session = get_object_or_404(Session, session=session_id, owner=request.user) server = ( Server.objects.annotate(connected_sessions=Count('sessions')) .filter(connected_sessions__lt=F('max_connections')) .order_by('id') .first() ) if not server: messages.error(request, "Нет доступных серверов.") new_session.delete() return redirect('waweb') new_session.server = server new_session.save() try: img_data = create_instance(new_session) if img_data: request.session['qr_image'] = img_data return redirect('', session_id=session_id) except Exception as e: print("request error:", e) new_session.delete() messages.error(request, "Failed to initiate session.") return redirect('waweb')
В функции create_instance отправим запрос в Evolution API на создание новой сессии и получим QR код:
def create_instance(session): server = session.server headers = {"apikey": server.api_key} payload = { "instanceName": str(session.session), "qrcode": True, "integration": "WHATSAPP-BAILEYS", "alwaysOnline": server.always_online, "groupsIgnore": server.groups_ignore, "readMessages": server.read_messages, } try: response = requests.post(f"{server.url}instance/create", json=payload, headers=headers, timeout=150) inst_data = response.json() instanceId = inst_data.get("instance", {}).get("instanceId") session.instanceId = instanceId session.save() print(inst_data) img_data = inst_data.get("qrcode", {}).get("base64", "") print(img_data) if "," in img_data: img_data = img_data.split(",", 1)[1] else: raise Exception("QR code base64 data is invalid or missing.") return img_data except RequestException as e: print("request error:", e) return None
Для отрисовки используем qr_code_page:
@login_required def qr_code_page(request, session_id): qr_image = request.session.pop('qr_image', '') try: session = Session.objects.get(session=session_id) except Session.DoesNotExist: messages.error(request, "Session not found.") return redirect('waweb') if not qr_image: qr_image = get_gr(request, session) if not qr_image: return redirect('waweb') public_id = redis_client.get(f"public_qr:{session_id}") if not public_id: public_id = str(uuid.uuid4()) redis_client.set(f"public_qr:{session_id}", public_id, ex=LINK_TTL) redis_client.set(f"public_qr:{public_id}", str(session_id), ex=LINK_TTL) message = Message.objects.filter(code="waweb_instruction").first() if message: messages.info(request, message.message) return render(request, 'waweb/qr_code.html', { 'qr_image': qr_image, 'request': request, 'public_id': public_id, })
А также добавим функцию для декодирования QR кода:
def get_gr(request, session): server = session.server if not server: messages.error(request, "Session is not attached to a server.") return if session.status == "open": messages.warning(request, "Session is connected.") return gr_url = f"{server.url}instance/connect/{session.session}" headers = {"apikey": server.api_key} try: response = requests.get(gr_url, headers=headers, timeout=60) if response.status_code == 404: return create_instance(session) inst_data = response.json() img_data = inst_data.get("base64", "") if img_data: qr_image = img_data.split(",", 1)[1] return qr_image else: messages.error(request, f"Failed to restart session. {inst_data}") return except RequestException: messages.error(request, "Failed connect to server") return
Пропишем маршруты к нашим вьюшкам:
from django.urls import path from . import views urlpatterns = [ path('waweb/', views.wa_sessions, name='waweb'), path('connect/', views.connect_number, name='connect_number'), path('qr/<uuid:session_id>/', views.qr_code_page, name='qr_code_page'), ]
Модуль Telegram коннектора
Коннекторы WhatsApp и Telegram довольно схожи в своей реализации и базовой логике обработки сообщений. Однако ввиду политики открытого API, интеграция с телегой потребовала гораздо меньшего количество ресурсов. Как минимум, вам не потребуется знание испанского для баг-фиксов.
Создадим django приложение telegram. В models.py нам нужно лишь хранить данные о подключенном Telegram боте, а также выставить вебхук для пересылки сообщений:
import requests from django.conf import settings from django.db import models from bitrix.models import AppInstance, Line class TelegramBot(models.Model): bot_token = models.CharField(max_length=255, unique=True, verbose_name="Bot Token") bot_username = models.CharField(max_length=255, verbose_name="Bot Username", blank=True, null=True) receive_messages = models.BooleanField(default=True) send_typing_actions = models.BooleanField(default=True) groups_ignore = models.BooleanField(default=True) date_end = models.DateTimeField(null=True, blank=True) owner = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, blank=True) app_instance = models.ForeignKey( AppInstance, on_delete=models.SET_NULL, related_name="telegrams", null=True, blank=True) line = models.ForeignKey( Line, on_delete=models.SET_NULL, blank=True, null=True, related_name="telegrams", ) def __str__(self): return self.bot_username def save(self, *args, **kwargs): super().save(*args, **kwargs) if self.bot_token and self.bot_username: site_domain = self.app_instance.app.site.domain webhook_url = f"https://{site_domain}/api/telegram/{self.bot_username}/" api_url = f"https://api.telegram.org/bot{self.bot_token}/setWebhook" response = requests.post(api_url, data={"url": webhook_url}) print(response.json()) if response.status_code != 200: raise Exception( f"Не удалось установить webhook: {response.text}" )
Обработчик событий в api/views.py функционально идентичен обработчику WhatsApp, с небольшими поправками на структуру сообщений Telegram. Поэтому задерживаться на этом моменте мы не будем, просто приложу готовый код:
import logging import bitrix.tasks as bitrix_tasks import redis import requests import telegram.tasks as telegram_tasks from django.conf import settings from rest_framework import permissions from rest_framework.decorators import action from rest_framework.response import Response from rest_framework.viewsets import GenericViewSet from telegram.models import TelegramBot logger = logging.getLogger("django") redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=6379, db=0) class TelegramEventsHandler(GenericViewSet): permission_classes = [permissions.AllowAny] def create(self, request, *args, **kwargs): data = request.data print(data) bot_name = kwargs.get("bot_name") message = data.get("message") if not message: return Response({"message": "No message"}, status=200) try: bot = TelegramBot.objects.get(bot_username=bot_name) except TelegramBot.DoesNotExist: return Response({'error': f'Bot {bot_name} does not exist'}) chat_id = message["chat"]["id"] is_group = message["chat"]["type"] in ["group", "supergroup"] message_id = message["message_id"] from_user = message["from"] username = from_user.get("username") or from_user.get("first_name") name = from_user.get("first_name") or "" + from_user.get("last_name") or "" user_full_name = name if name != "" else from_user.get("username") pushName = username if redis_client.exists(f"telegram:{message_id}"): return Response({"message": "loop message"}, status=200) redis_client.setex(f"telegram:{message_id}", 600, message_id) try: if bot.line: group_title = message["chat"].get("title") if is_group else None attachments = None media_keys = [ "photo", "document", "video", "audio", "voice", "video_note" ] text = message.get("text", " ") for key in media_keys: if key in message: attachments = [] if key == "photo": items = [message[key][-1]] else: items = message[key] if isinstance(message[key], list) else [message[key]] text = message.get("capture", " ") for item in items: file_id = item["file_id"] tg_response = requests.get( f"https://api.telegram.org/bot{bot.bot_token}/getFile?file_id={file_id}" ) tg_response.raise_for_status() file_info = tg_response.json().get("result", {}) file_path = file_info.get("file_path") if not file_path: continue file_url = f"https://api.telegram.org/file/bot{bot.bot_token}/{file_path}" filename = file_path.split("/")[-1] attachments.append({ "url": file_url, "name": filename, "type": key }) if is_group: text_to_send = f"*{group_title}*\n{user_full_name}: {text}" chat_key = f'bitrix_chat:telegram_group:{chat_id}' redis_client.setex(chat_key, 60 * 60 * 24, chat_id) else: text_to_send = text bitrix_tasks.send_messages( bot.app_instance.id, chat_id, text_to_send, bot.line.connector.code, bot.line.line_id, False, pushName, str(message_id), attachments=attachments, ) except Exception as e: logger.error(f"Bitrix send error: {e}") return Response({"message": "processed"}, status=200) @action(detail=False, methods=['post'], url_path=r'(?P<session>[^/.]+)/send') def send(self, request, session=None, *args, **kwargs): """ Send outgoing message to Telegram using session id """ session_id = session if not session_id: return Response({'error': 'session is required'}) try: bot = TelegramBot.objects.get(id=session_id) except Exception as e: return Response({'error': 'An error occurred', 'details': str(e)}) if bot.date_end and timezone.now() > bot.date_end: return Response({'error': 'tariff has expired'}, status=402) data = request.data event = data.get('event') message_type = data.get('message_type') attachments = data.get('attachments', {}) if event == "message_created" and message_type == "outgoing": message_id = data.get('id') if redis_client.exists(f'chatwoot:{message_id}'): return Response({'message': 'loop message'}) redis_client.setex(f'chatwoot:{message_id}', 600, message_id) content = data.get('content') conversation = data.get('conversation', {}) meta = conversation.get('meta', {}) sender = meta.get('sender', {}) chat_id = sender.get('chat_id') if not chat_id: return Response({'error': 'chat_id not provided in sender meta'}) if content: telegram_tasks.send_telegram_message_task.delay( bot_token=bot.bot_token, recipient=chat_id, content=content, cont_type="string" ) if bot.line: bitrix_tasks.message_add.delay( bot.app_instance.id, bot.line.line_id, str(chat_id), content, bot.line.connector.code ) if attachments: for attachment in attachments: telegram_tasks.send_telegram_message_task.delay( bot_token=bot.bot_token, recipient=chat_id, content=attachment, cont_type="media" ) return Response({'message': 'All files sent successfully'}) return Response({'message': f'Session {session_id} authorized'})
Создадим асинхронную функцию для отправки сообщений в tasks.py, выполним проверку на тип сообщения (текст или медиа файл):
import base64 import os import tempfile import requests from celery import shared_task @shared_task def send_telegram_message_task(bot_token, recipient, content, cont_type="string"): """ Отправляет сообщение или медиа в Telegram. recipient: chat_id (str или int) или @username content: строка (текст) или dict с base64-данными cont_type: 'string' или 'media' """ print("send_telegram_message_task is called") TELEGRAM_API_BASE = f"https://api.telegram.org/bot{bot_token}" if cont_type == "string": url = f"{TELEGRAM_API_BASE}/sendMessage" payload = { "chat_id": recipient, "text": content, "parse_mode": "Markdown", # или HTML "disable_web_page_preview": False, } response = requests.post(url, json=payload, timeout=30) response.raise_for_status() return response.json() elif cont_type == "media": mimetype = content.get("mimetype", "") base_type = mimetype.split("/")[0] file_url = content.get("url") or content.get("link") file_name = content.get("filename") or content.get("name", "file") if not file_url: raise ValueError("Media content must include 'link' or 'url'") try: file_response = requests.get(file_url, stream=True, timeout=30) file_response.raise_for_status() with tempfile.NamedTemporaryFile(delete=False) as tmp_file: for chunk in file_response.iter_content(chunk_size=8192): tmp_file.write(chunk) tmp_file_path = tmp_file.name files = {} data = {"chat_id": recipient} if base_type == "image": url = f"{TELEGRAM_API_BASE}/sendPhoto" files["photo"] = (file_name, open(tmp_file_path, "rb"), mimetype) data["caption"] = "" else: url = f"{TELEGRAM_API_BASE}/sendDocument" files["document"] = (file_name, open(tmp_file_path, "rb"), mimetype) data["caption"] = "" response = requests.post(url, data=data, files=files, timeout=60) response.raise_for_status() return response.json() finally: if os.path.exists(tmp_file_path): os.remove(tmp_file_path)
Не вижу смысла в реализации отдельных страниц для подключения Telegram бота, ввиду наличия «из коробки» прекрасной админ панели в django, поэтому создадим лишь страницу с отображением подключенных ботов:
import requests import redis import uuid from requests.exceptions import RequestException from django.shortcuts import render, redirect, get_object_or_404 from django.contrib.auth.decorators import login_required from django.contrib import messages from django.db.models import Count, F from django.utils import timezone from django.conf import settings from bitrix.models import AppInstance, Line, Connector import bitrix.utils as bitrix_utils from main.decorators import login_message_required from .models import TelegramBot apps = settings.INSTALLED_APPS redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=6379, db=0, decode_responses=True) LINK_TTL = 60 * 60 * 24 @login_message_required(code="telegram") def tg_bots(request): connector_service = "telegram" connector = Connector.objects.filter(service=connector_service).first() if request.method == "POST": session_id = request.POST.get("session_id") line_id = request.POST.get("line_id") if not line_id: messages.warning(request, "Необходимо выбрать линию из списка или создать новую.") return redirect('telegram') bot = get_object_or_404(TelegramBot, id=session_id, owner=request.user) if not bot.bot_username: messages.error(request, "Сначала необходимо подключить WhatsApp.") return redirect('telegram') if bot.line and str(bot.line.id) == str(line_id): messages.warning(request, "Эта линия уже подключена к выбранной сессии.") return redirect('telegram') try: bitrix_utils.connect_line(request, line_id, bot, connector, connector_service) except Exception as e: messages.error(request, str(e)) return redirect('telegram') return redirect('telegram') bots = TelegramBot.objects.filter(owner=request.user) instances = AppInstance.objects.filter(owner=request.user, app__connectors=connector) tg_lines = Line.objects.filter(connector=connector, owner=request.user) return render( request, 'telegram/tg_sessions.html', { "bots": bots, "instances": instances, "tg_lines": tg_lines, } )
На этом реализация нашего коннектора завершена, а в третьей части мы рассмотрим подключение Локальных приложений к Открытым линиям CRM Bitrix24.
ссылка на оригинал статьи https://habr.com/ru/articles/944560/
Добавить комментарий