WhatsApp Web и Telegram коннектор для Bitrix24: наш опыт реализации и внедрения. Часть 1 — Интеграция Bitrix24

от автора

Привет, мир! Меня зовут Павел, я IT инженер и руководитель службы технической поддержки.

Работая в формате крупного IT-аутсорсинга, мы в компании столкнулись с проблемой: использование общего WhatsApp/Telegram Web, подключённого на компьютерах сотрудников поддержки, оказалось неэффективным. Такой подход не позволял контролировать качество диалогов, а также затруднял перевод обращений клиентов в структурированные тикеты, вследствие чего была начата разработка коннектора к нашему корпоративному порталу Bitrix24.

В данной статье я бы хотел поделиться основами технической реализации и ключевым функционалом коннектора. Детали реализации — например, верстку html страниц, тесты, кастомизацию панели администратора и так далее — оставляю на ваше усмотрение.

Технологический стек

Основу проекта составляют Python и Django, что обусловлено их простотой, гибкостью и широким набором готовых решений. Такой выбор дал возможность быстро реализовать минимально жизнеспособный продукт (MVP) и заложить фундамент для дальнейшего масштабирования.

Также вам понадобится установить redis, celery и настроить их.

Основной функционал Bitrix

Опустим установку Django и подготовку виртуального окружения и приступим к реализации интеграции с Битриксом. Для этого создадим Django приложение bitrix. Начнем с создания моделей в models.py:

import uuid from django.conf import settings from django.contrib.sites.models import Site from django.db import models

Модель для хранения данных о портале:

class Bitrix(models.Model):     PROTOCOL_CHOICES = [         ('http', 'HTTP'),         ('https', 'HTTPS'),     ]     protocol = models.CharField(max_length=5, choices=PROTOCOL_CHOICES, default='https')     domain = models.CharField(max_length=255)     owner = models.ForeignKey(         User, on_delete=models.SET_NULL, blank=True, null=True     )     user_id = models.CharField(max_length=255, blank=True, null=True)     member_id = models.CharField(max_length=255, unique=True, blank=True, null=True)     license_expired = models.BooleanField(default=False)      def __str__(self):         return self.domain

Модель коннектора:

class Connector(models.Model):     TYPE_CHOICES = [         ('telegram', 'Telegram Bot'),         ('waweb', 'WhatsApp Web'),     ]     code = models.CharField(max_length=255, default=uuid.uuid4(), unique=True)     service = models.CharField(max_length=255, choices=TYPE_CHOICES, blank=True, null=True)     name = models.CharField(max_length=255, default="itsource.kg", unique=False)     icon = models.FileField(upload_to='connector_icons/', blank=True, null=True,                             default='connector_icons/cloud-rain-alt.svg') # Заменить на вашу svg иконку      def __str__(self):         return self.name

Конфигурация приложения Битрикс:

class App(models.Model):     id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)     owner = models.ForeignKey(         settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, blank=True, null=True     )     site = models.ForeignKey(         Site, on_delete=models.SET_NULL, related_name="apps", blank=True, null=True     )     name = models.CharField(max_length=255, blank=True, unique=False)     page_url = models.CharField(max_length=255, blank=True, default="/")     connectors = models.ManyToManyField(Connector, blank=True, related_name='apps')     asterisk = models.BooleanField(default=False, help_text="Chek for Asterisk connector")     client_id = models.CharField(max_length=255, blank=True, unique=False)     client_secret = models.CharField(max_length=255, blank=True)      def __str__(self):         return self.name

Модель для хранения сущностей локальных приложений и открытых линий:

class AppInstance(models.Model):     id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)     owner = models.ForeignKey(         settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, blank=True, null=True     )     app = models.ForeignKey(App, on_delete=models.SET_NULL, related_name="installations", blank=True, null=True)     portal = models.ForeignKey(         Bitrix, on_delete=models.CASCADE, related_name="installations", blank=True, null=True     )     auth_status = models.CharField(max_length=1)     application_token = models.CharField(max_length=255, blank=True)     storage_id = models.CharField(max_length=255, blank=True)     status = models.IntegerField(default=0, blank=True)     attempts = models.IntegerField(default=0, blank=True)     access_token = models.CharField(max_length=255, blank=True, null=True, editable=False)     refresh_token = models.CharField(max_length=255, blank=True, null=True, editable=False)      def __str__(self):         return f"{self.app.name} on {self.portal.domain}"   class Line(models.Model):     line_id = models.CharField(max_length=50)     owner = models.ForeignKey(         settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, blank=True, null=True     )     app_instance = models.ForeignKey(AppInstance, on_delete=models.CASCADE, related_name="lines", null=True)     connector = models.ForeignKey(Connector, on_delete=models.SET_NULL, related_name="lines", null=True)     portal = models.ForeignKey(Bitrix, on_delete=models.CASCADE, related_name="lines", blank=True, null=True)      def __str__(self):         return f"Line {self.line_id}"

Я приверженец модульной архитектуры, поэтому создадим отдельный модуль api внутри приложения bitrix для реализации api вебхуков

api/serializers.py

from rest_framework import serializers  from bitrix.models import Bitrix   class PortalSerializer(serializers.ModelSerializer):     class Meta:         model = Bitrix         fields = [             "owner",             "user_id",             "domain",         ]      def create(self, validated_data):         return Bitrix.objects.create(**validated_data) 

api/views.py

from rest_framework.mixins import CreateModelMixin from rest_framework.renderers import JSONRenderer from rest_framework.viewsets import GenericViewSet from rest_framework.response import Response  from bitrix.models import Bitrix  from bitrix.utils import event_processor from .serializers import PortalSerializer   class PortalViewSet(CreateModelMixin, GenericViewSet):     queryset = Bitrix.objects.all()     serializer_class = PortalSerializer      def create(self, request, *args, **kwargs):         print("create func")         return event_processor(request)      def head(self, request, *args, **kwargs):         print("head func")         return Response(headers={'Allow': 'POST, HEAD'}) 

В settings.py в приложении с ядром проекта включим авторизацию по токенам

REST_FRAMEWORK = {     "DEFAULT_AUTHENTICATION_CLASSES": (         "rest_framework.authentication.SessionAuthentication",         "rest_framework.authentication.TokenAuthentication",         "core.qpta.QueryParamTokenAuthentication",     ),     "DEFAULT_PERMISSION_CLASSES": ("rest_framework.permissions.IsAuthenticated",),     'DEFAULT_RENDERER_CLASSES': ('rest_framework.renderers.JSONRenderer',) }

Там же создадим qpta.py:

from rest_framework.authentication import TokenAuthentication   class QueryParamTokenAuthentication(TokenAuthentication):     def authenticate(self, request):         # Try to get the token from the URL query parameter         token = request.query_params.get("api-key")          if not token:             # Fall back to default token authentication             return super().authenticate(request)          # Authenticate the token manually         user, token = self.authenticate_credentials(token)         return (user, token) 

И api_router.py:

from django.conf import settings from rest_framework.routers import DefaultRouter from rest_framework.routers import SimpleRouter  from bitrix.api.views import PortalViewSet from users.api.views import UserViewSet from waweb.api.views import EventsHandler from telegram.api.views import TelegramEventsHandler  router = DefaultRouter() if settings.DEBUG else SimpleRouter()  router.register("users", UserViewSet) router.register("bitrix", PortalViewSet) router.register("waweb", EventsHandler, basename="waevents") router.register("telegram", TelegramEventsHandler, basename="tgevents")   app_name = "api" urlpatterns = router.urls

В urls.py добавим:

urlpatterns += [     path("api/", include("core.api_router")), # core заменить на название приложения где лежит ядро проекта   ]

Создадим файл utils.py с основными функциями:

import base64 import json import logging import re import redis  import requests from django.core.exceptions import ObjectDoesNotExist from django.db import transaction from django.contrib import messages from django.conf import settings from django.shortcuts import redirect, get_object_or_404  from rest_framework import status from rest_framework.authtoken.models import Token from rest_framework.response import Response from django.http import HttpResponse  from waweb.models import Session import waweb.utils as waweb import waweb.tasks as waweb_tasks import telegram.tasks as telegram_tasks  from .crest import call_method from .models import App, AppInstance, Bitrix, Line, Connector import bitrix.tasks as bitrix_tasks  from telegram.models import TelegramBot  redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=6379, db=0)   logger = logging.getLogger("django")  GENERAL_EVENTS = [     "ONAPPUNINSTALL", ]  CONNECTOR_EVENTS = [     "ONIMCONNECTORMESSAGEADD",     "ONIMCONNECTORLINEDELETE",     "ONIMCONNECTORSTATUSDELETE", ] 

Функция для подключения открытой линии разделяется на 2 логических блока: создание и подключение уже существующей линии. Если id линии начинается с create, то создаем новую открытую линию и AppInstance. Далее отправляем в Битрикс конфиги открытой линии (imopenlines.config.add), проверяем AppInstance на наличие подключенной открытой линии и деактивируем ее, если таковая имеется. Финальным шагом активируем нашу открытую линию в Битриксе методом imconnector.activate и возвращаем редирект на страницу завершения подключения:

def connect_line(request, line_id, entity, connector, redirect_to):     line_id = str(line_id)     if line_id.startswith("create__"):         instance_id = line_id.split("__")[1]         app_instance = get_object_or_404(AppInstance, id=instance_id, owner=request.user)         if not app_instance.portal:             messages.error(request, "Невозможно создать линию: портал не найден")             return redirect(redirect_to)         if entity.line:             call_method(app_instance, "imconnector.activate", {                 "CONNECTOR": connector.code,                 "LINE": entity.line.line_id,                 "ACTIVE": 0,             })         if connector.service == "telegram":             line_name = entity.bot_username         else:             line_name = entity.phone         create_payload = {"PARAMS": {"LINE_NAME": line_name}}         result = call_method(app_instance, "imopenlines.config.add", create_payload)         if result and result.get("result"):             new_line_id = result["result"]             line = Line.objects.create(                 line_id=new_line_id,                 portal=app_instance.portal,                 connector=connector,                 app_instance=app_instance,                 owner=request.user             )             entity.line = line             entity.app_instance = app_instance             entity.save()              activate_payload = {                 "CONNECTOR": connector.code,                 "LINE": new_line_id,                 "ACTIVE": 1,             }             call_method(app_instance, "imconnector.activate", activate_payload)             messages.success(request, f"Создана и подключена линия {new_line_id}")         else:             messages.error(request, f"Ошибка при создании линии: {result}")         return redirect(redirect_to)     else:         line = get_object_or_404(Line, id=line_id)         if not line:             messages.error(request, f"Линия {line_id} не найдена")             return redirect(redirect_to)         entity_model = type(entity)         usage_count = entity_model.objects.filter(line=line).exclude(pk=entity.pk).count()         if usage_count > 0:             messages.error(request, "Эта линия уже используется.")             return redirect(redirect_to)          app_instance = line.app_instance          if entity.line == line:             messages.success(request, "Выбрана та же линия")             return redirect(redirect_to)         if entity.line:             call_method(app_instance, "imconnector.activate", {                 "CONNECTOR": connector.code,                 "LINE": entity.line.line_id,                 "ACTIVE": 0,             })         response = call_method(app_instance, "imconnector.activate", {             "CONNECTOR": connector.code,             "LINE": line.line_id,             "ACTIVE": 1,         })         if response.get("result"):             entity.line = line             entity.app_instance = app_instance             entity.save()             messages.success(request, "Линия подключена")      messages.success(request, "Настройки обновлены")     return redirect(redirect_to)

Подписка на события Битрикс представляет из себя всего 1 запрос на эндпоинт event.bind, в котором мы сообщаем адрес нашего вебхука, принимающего события:

def events_bind(events: dict, appinstance: AppInstance, api_key: str):     url = appinstance.app.site     for event in events:         payload = {             "event": event,             "HANDLER": f"https://{url}/api/bitrix/?api-key={api_key}",         }          try:             return call_method(appinstance, "event.bind", payload)         except (ObjectDoesNotExist, Exception) as exc:             print(exc, " in events_bind")             return None

Регистрация коннектора включает в себя 2 функции: регистрация коннектора (register_connector) и обработка его установки (process_placement).

Функция регистрации коннектора добавляет его в меню битрикса CRM — Контакт-Центр. Здесь мы декодируем нашу svg иконку в base64 и вызываем метод imconnector.register с такими параметрами, как uuid коннектора, название коннектора и его иконка в base64. После вызова этой функции, карточка коннектора появится в Контакт-Центре.

Что касается обработчика установки, его следует воспринимать как входящий вебхук от битрикса. Здесь мы получаем параметры коннектора и открытой линии, uuid приложения и адрес портала Битрикс. Если данные корректны, привязываем Открытую Линию к сущности локального приложения коннектора и пользователю и передаем HANDLER для событий, связанных с сообщениями:

def register_connector(appinstance: AppInstance, api_key: str, connector):     url = appinstance.app.site      if not connector.icon:         return None      try:         with open(connector.icon.path, "rb") as file:             image_data = file.read()             encoded_image = base64.b64encode(image_data).decode("utf-8")             connector_logo = f"data:image/svg+xml;base64,{encoded_image}"          payload = {             "ID": connector.code,             "NAME": connector.name,             "ICON": {                 "DATA_IMAGE": connector_logo,             },             "PLACEMENT_HANDLER": f"https://{url}/app-settings/?inst={appinstance.id}",         }          try:             return call_method(appinstance, "imconnector.register", payload)         except (ObjectDoesNotExist, Exception) as exc:             print(exc, " in register_connector")             return None          events_bind(CONNECTOR_EVENTS, appinstance, api_key)      except FileNotFoundError:         return None     except Exception as e:         return None   def process_placement(request):     try:         data = request.POST         placement_options = data.get("PLACEMENT_OPTIONS")         instance_id = request.GET.get("inst")         domain = request.GET.get("DOMAIN")         print("process placement GET ", request.GET)         print("FULL URL:", request.build_absolute_uri())          placement_options = json.loads(placement_options)         line_id = placement_options.get("LINE")         connector_code = placement_options.get("CONNECTOR")          app_instance = AppInstance.objects.filter(id=instance_id).first()         if not app_instance:             return HttpResponse("app not found")         portal = Bitrix.objects.filter(domain=domain).first()         if not portal:             return HttpResponse("bitrix not found")         connector = Connector.objects.filter(code=connector_code).first()         if not connector:             return HttpResponse("connector not found")         line, created = Line.objects.get_or_create(             line_id=line_id,             portal=portal,             connector=connector,             app_instance=app_instance,             owner=app_instance.owner         )         activate_payload = {             "CONNECTOR": connector.code,             "LINE": line_id,             "ACTIVE": 1,         }         print("activate_payload= ", activate_payload)         activate_result = call_method(app_instance, "imconnector.activate", activate_payload)          if not activate_result or not activate_result.get("result"):             return HttpResponse(f"Failed to activate connector: {activate_result}")          # api_key = request.GET.get("api-key")         api_key = Token.objects.get(user=app_instance.owner).key         print("request data= ", request.GET)         print("api_key=", api_key)         print("app_instance=", app_instance)         events = ["ONIMCONNECTORMESSAGEADD", "ONIMCONNECTORSTATUSDELETE", "ONIMCONNECTORLINEDELETE"]         for event in events:             event_payload = {                 "event": event,                 "HANDLER": f"https://{app_instance.app.site}/api/bitrix/?api-key={api_key}",             }             call_method(app_instance, "event.bind", event_payload)          return HttpResponse(             f"Линия успешно создана и активирована. Настройте доп. параметры в Bitrix: https://{app_instance.app.site}/portals/"         )     except Exception as e:         logger.error(f"Unexpected error: {e}")         return HttpResponse({"An unexpected error occurred"})

Обработка файлов и событий. extract_files и upload_file описывают логику работы коннектора с файлами. Ничего сверхъестественного, поэтому предлагаю не задерживаться на этом и перейдем к event_processor — функции, обрабатывающей все входящие события от Битрикса.

def extract_files(data):     files = []     i = 0     while True:         name_key = f"data[MESSAGES][0][message][files][{i}][name]"         link_key = f"data[MESSAGES][0][message][files][{i}][link]"         type_key = f"data[MESSAGES][0][message][files][{i}][type]"          if name_key in data and link_key in data:             files.append(                 {                     "name": data.get(name_key),                     "link": data.get(link_key),                     "type": data.get(type_key),                 },             )             i += 1         else:             break      return files   def upload_file(appinstance, storage_id, fileContent, filename):     payload = {         "id": storage_id,         "fileContent": fileContent,         "data": {"NAME": filename},         "generateUniqueName": True,     }     upload_to_bitrix = call_method(appinstance, "disk.folder.uploadfile", payload)     if "result" in upload_to_bitrix:         return upload_to_bitrix["result"]     else:         return None

event_processor — это единый обработчик всех событий, поступающих от Bitrix24 по REST-хукам. Он принимает входящие запросы, извлекает из них данные (события, токены, параметры портала) и в зависимости от типа события выполняет соответствующую бизнес-логику:

  • регистрация приложения и портала при установке,

  • обработка новых сообщений от пользователей,

  • отключение или удаление линии,

  • корректное завершение работы при удалении приложения.

Рассмотрим каждое событие более детально.

Обработка установки приложения (ONAPPINSTALL)

Когда пользователь впервые устанавливает приложение в свой портал Bitrix24, прилетает событие ONAPPINSTALL.

Здесь функция:

  1. Проверяет, существует ли экземпляр приложения (AppInstance).

  2. Если нет — создаёт новый портал (Bitrix) и привязывает к нему приложение.

  3. Сохраняет выданные Bitrix24 токены (access_token, refresh_token, application_token).

  4. Регистрирует события (event.bind) и подключает доступные коннекторы (например, WhatsApp или Telegram).

  5. При необходимости создаёт хранилище в Bitrix Диске (disk.storage.getforapp).

  6. Возвращает сообщение об успешной установке.

Таким образом, уже на этапе установки приложение готово к полноценной работе.

Обработка входящих сообщений (ONIMCONNECTORMESSAGEADD)

Одно из самых частых событий — это новое сообщение от клиента, пришедшее в открытую линию.

Алгоритм работы:

  1. Определяется, какой коннектор сработал (WhatsApp Web или Telegram).

  2. Извлекаются данные о линии (LINE), сообщении (MESSAGE_ID, CHAT_ID) и его содержимом.

  3. Bitrix получает подтверждение о доставке через метод imconnector.send.status.delivery.

  4. Система проверяет, не было ли это сообщение отправлено самой интеграцией (защита от «петли» через Redis).

  5. Если есть вложения — они извлекаются.

  6. В зависимости от коннектора сообщение пробрасывается в WhatsApp или Telegram:

    • для WhatsApp — через waweb-сессию,

    • для Telegram — через соответствующего бота.

  7. Если отправка успешна — сообщение сохраняется в базе.

Таким образом, коннектор становится полноценным «мостом» между Bitrix24 и внешним мессенджером.

Отключение линии (ONIMCONNECTORSTATUSDELETE)

Когда в Bitrix24 отключается линия, событие ONIMCONNECTORSTATUSDELETE уведомляет приложение о том, что коннектор больше неактивен.

Здесь происходит:

  • поиск линии по ID,

  • отвязка связанного ресурса (например, телефона в WhatsApp),

  • возврат ответа о том, что линия корректно отключена.

Удаление линии (ONIMCONNECTORLINEDELETE)

Если линия полностью удаляется в Bitrix24, приложение должно синхронизировать изменения.

Обработчик:

  • находит линию в базе,

  • удаляет её,

  • возвращает статус операции.

Удаление приложения (ONAPPUNINSTALL)

При удалении приложения из Bitrix24 (ONAPPUNINSTALL):

  1. Удаляется AppInstance (экземпляр приложения для конкретного портала).

  2. Если у портала больше нет ни одного экземпляра приложения — опционально удаляется и сам портал.

def event_processor(request):     try:         data = request.data         event = data.get("event")         domain = data.get("auth[domain]")         user_id = data.get("auth[user_id]")         auth_status = data.get("auth[status]")         access_token = data.get("auth[access_token]")         refresh_token = data.get("auth[refresh_token]")         application_token = data.get("auth[application_token]")         member_id = data.get("auth[member_id]")         api_key = request.query_params.get("api-key")         app_id = request.query_params.get("app-id")          print(data)          try:             if not api_key:                 raise ValueError("API key is required")              appinstance = AppInstance.objects.get(application_token=application_token)             appinstance.access_token = access_token             appinstance.refresh_token = refresh_token             appinstance.save()              if not appinstance.portal.member_id:                 appinstance.portal.member_id = member_id                 appinstance.portal.save()          except AppInstance.DoesNotExist:             if event == "ONAPPINSTALL":                 try:                     app = App.objects.get(id=app_id)                 except App.DoesNotExist:                     return Response({"message": "App not found."})                  portal, created = Bitrix.objects.get_or_create(                     member_id=member_id,                     defaults={                         "domain": domain,                         "user_id": user_id,                         "owner": request.user if auth_status == "L" else None,                     }                 )                  appinstance_owner = (                     portal.owner                     if portal.owner                     else (request.user if auth_status == "L" else None)                 )                  appinstance, created = AppInstance.objects.update_or_create(                     app=app,                     portal=portal,                     owner=appinstance_owner,                     defaults={                         "auth_status": auth_status,                         "access_token": access_token,                         "refresh_token": refresh_token,                         "application_token": application_token,                     }                 )                  storage_data = call_method(appinstance, "disk.storage.getforapp", {})                 if "result" in storage_data:                     storage_id = storage_data["result"]["ID"]                     appinstance.storage_id = storage_id                     appinstance.save()                  # Регистрация коннектора/ подписка на события                 def register_events_and_connectors():                     events_bind(GENERAL_EVENTS, appinstance, api_key)                     if app.connectors.exists():                         for connector in app.connectors.all():                             register_connector(appinstance, api_key, connector)                  transaction.on_commit(register_events_and_connectors)                  if VENDOR_BITRIX_INSTANCE:                     bitrix_tasks.create_deal(appinstance.id, VENDOR_BITRIX_INSTANCE, app.name)                  if portal.owner:                     return Response('App successfully created and linked')                  return Response(                     {"message": "App and portal successfully created and linked."},                     status=status.HTTP_201_CREATED,                 )             else:                 return Response({"message": "App not found and not an install event."})          # Обработка события ONIMCONNECTORMESSAGEADD         if event == "ONIMCONNECTORMESSAGEADD":             connector_code = data.get("data[CONNECTOR]")             connector = get_object_or_404(Connector, code=connector_code)             if not connector:                 return Response({'Connector not found'})             line_id = data.get("data[LINE]")             message_id = data.get("data[MESSAGES][0][im][message_id]")             chat_id = data.get("data[MESSAGES][0][im][chat_id]")             chat = data.get("data[MESSAGES][0][chat][id]")             status_data = {                 "CONNECTOR": connector_code,                 "LINE": line_id,                 "MESSAGES": [                     {                         "im": {                             "chat_id": chat_id,                             "message_id": message_id,                         },                     },                 ],             }              call_method(appinstance, "imconnector.send.status.delivery", status_data)              # Проверяем наличие сообщения в редис (отправлено из других сервисов )             if redis_client.exists(f'bitrix:{domain}:{message_id}'):                 return Response({'message': 'loop message'})              file_type = data.get("data[MESSAGES][0][message][files][0][type]", None)             text = data.get("data[MESSAGES][0][message][text]", None)             if text:                 text = re.sub(r"\[(?!(br|\n))[^\]]+\]", "", text)                 text = text.replace("[br]", "\n")              files = []             print(file_type)             if file_type:                 files = extract_files(data)              if connector.service == "waweb":                 try:                     line = Line.objects.get(line_id=line_id, app_instance=appinstance)                     wa = Session.objects.get(line=line)                     if files:                         for file in files:                             waweb_tasks.send_message_task(str(wa.session), [chat], file, 'media')                     resp = waweb.send_message(wa.session, chat, text)                     if resp.status_code == 201:                         waweb.store_msg(resp)                 except Exception as e:                     print(f'Failed to send waweb message: {str(e)}')                     return Response({'error': f'Failed to send message: {str(e)}'})              if connector.service == "telegram":                 try:                     # Получаем линию                     line = Line.objects.get(line_id=line_id, app_instance=appinstance)                     # Получаем бота                     bot = TelegramBot.objects.get(line=line)                      # Сначала отправляем медиафайлы, если есть                     if files:                         for file in files:                             telegram_tasks.send_telegram_message_task(                                 bot_token=bot.bot_token,                                 recipient=chat,                                 content=file,                                 cont_type="media"                             )                      # Отправляем текстовое сообщение                     if text:                         telegram_tasks.send_telegram_message_task(                             bot_token=bot.bot_token,                             recipient=chat,                             content=text,                             cont_type="string"                         )                  except Exception as e:                     print(f'Failed to send Telegram message: {str(e)}')                     return Response({'error': f'Failed to send message: {str(e)}'})              return Response(                 {"status": "ONIMCONNECTORMESSAGEADD event processed"},                 status=status.HTTP_200_OK,             )          elif event == "ONIMCONNECTORSTATUSDELETE":             line_id = data.get("data[line]")             connector_code = data.get("data[connector]")             connector = get_object_or_404(Connector, code=connector_code)             if not connector:                 return Response({'Connector not found'})             try:                 line = Line.objects.get(line_id=line_id, app_instance=appinstance)                  if connector.service == "waweb":                     phone = line.wawebs.first()                     if phone:                         phone.line = None                         phone.save()                  return Response("Line disconnected")              except Line.DoesNotExist:                 return Response(                     {"status": "Line not found"},                     status=status.HTTP_200_OK,                 )           elif event == "ONIMCONNECTORLINEDELETE":             line_id = data.get("data")             try:                 line = Line.objects.filter(line_id=line_id, app_instance=appinstance).first()                 if line:                     line.delete()                 return Response({"status": "Line deleted"}, status=status.HTTP_200_OK)             except Line.DoesNotExist:                 return Response(                     {"status": "Line not found"}, status=status.HTTP_200_OK                 )          elif event == "ONAPPUNINSTALL":             portal = appinstance.portal             appinstance.delete()             if not AppInstance.objects.filter(portal=portal).exists():                 # portal.delete()                 return Response(f"{appinstance} and associated portal deleted")             else:                 return Response(f"{appinstance} deleted")          else:             return Response('Unsupported event')      except Exception as e:         logger.error(f"Error occurred: {e!s}")         return Response(             {"error": "Internal server error"},             status=status.HTTP_500_INTERNAL_SERVER_ERROR,         ) 

Для асинхронных запросов создадим tasks.py:

import redis from celery import shared_task import logging from django.core.exceptions import ObjectDoesNotExist from django.conf import settings  from .crest import call_method from .models import AppInstance   logger = logging.getLogger("django")  redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=6379, db=0)  FROM_MARKET_FIELD = settings.FROM_MARKET_FIELD  @shared_task(bind=True, max_retries=5, default_retry_delay=5) def call_api(self, id, method, payload):     try:         appinstance = AppInstance.objects.get(id=id)         return call_method(appinstance, method, payload)     except (ObjectDoesNotExist, Exception) as exc:         raise self.retry(exc=exc)   @shared_task def get_app_info():     app_instances = AppInstance.objects.all()     for app_instance in app_instances:         if app_instance.attempts < settings.BITRIX_CHECK_APP_ATTEMTS:             call_api(app_instance.id, "app.info", {})   @shared_task(bind=True, max_retries=5, default_retry_delay=5) def send_messages(self, app_instance_id, user_phone, text, connector,                   line, sms=False, pushName=None,                   message_id=None, attachments=None, profilepic_url=None,                   chat_id=None, chat_url=None, user_id=None):     init_message = "System: initiation message."     try:         if not app_instance_id:             raise ValueError("app_instance_id is required and cannot be None or empty")         app_instance = AppInstance.objects.get(id=app_instance_id)         print(pushName)         bitrix_msg = {             "CONNECTOR": connector,             "LINE": line,             "MESSAGES": [                 {                     "user": {                         "phone": user_phone,                         "name": pushName or user_phone,                         "id": user_id or user_phone,                         "skip_phone_validate": 'Y',                         # "picture": {                         #     "url": profilepic_url                         # }                     },                     "chat": {                         "id": chat_id or user_phone,                         "url": chat_url                     },                     "message": {                         "text": text,                         "id": message_id,                         "files": attachments or []                     }                 }             ],         }          resp = call_method(app_instance, "imconnector.send.messages", bitrix_msg)          result = resp.get("result", {})         results = result.get("DATA", {}).get("RESULT", [])         for result_item in results:             chat_session = result_item.get("session", {})             if chat_session:                 domain = app_instance.portal.domain                 chat_id = chat_session.get("CHAT_ID")                 identity = user_id or user_phone                 redis_client.set(f"bitrix_chat:{domain}:{line}:{identity}", chat_id)         return resp      except Exception as e:         raise e   @shared_task(bind=True, max_retries=5, default_retry_delay=5) def message_add(self, app_instance_id, line_id, user_phone, text, connector):     try:         app_instance = AppInstance.objects.get(id=app_instance_id)     except AppInstance.DoesNotExist:         logger.error(f"AppInstance {app_instance_id} does not exist")         raise      domain = app_instance.portal.domain     chat_key = f'bitrix_chat:{domain}:{line_id}:{user_phone}'      if redis_client.exists(chat_key):         chat_id = redis_client.get(chat_key).decode('utf-8')         payload = {             "DIALOG_ID": f"chat{chat_id}",             "MESSAGE": text,         }          max_send_attempts = 3          for attempt in range(max_send_attempts):             try:                 resp = call_method(app_instance, "im.message.add", payload)                 message_id = resp.get("result")                 print(message_id)                 redis_client.setex(f'bitrix:{domain}:{message_id}', 600, message_id)                 payload_status = {                     "CONNECTOR": connector,                     "LINE": line_id,                     "MESSAGES": [{                         "im": {                             "chat_id": str(chat_id),                             "message_id": str(message_id)                         }                     }]                 }                 delivery_status_resp = call_method(app_instance, "imconnector.send.status.delivery", payload_status)                 print(delivery_status_resp)                 return resp             except Exception as e:                 if attempt >= max_send_attempts - 1:                     logger.error(f"Exception occurred while sending message: {e}")                     raise                 else:                     self.retry(exc=e)      send_messages(app_instance_id, user_phone, text, connector, line_id, True)   @shared_task def create_deal(app_instance_id, vendor_inst_id, app_name):     app_instance = AppInstance.objects.get(id=app_instance_id)     try:         user_current = call_method(app_instance, "user.current", {})         user_data = user_current.get("result", {})         user_email = user_data.get("EMAIL")     except Exception as e:         return     if not user_email:         return     user_id = None     venrot_instance = AppInstance.objects.get(id=vendor_inst_id)     # Поиск контакта в битрикс      payload = {         "FILTER": {             "EMAIL": user_email         },         "select": [FROM_MARKET_FIELD]     }     client_data = call_method(venrot_instance, "crm.contact.list", payload)     if "result" in client_data:         client_data = client_data.get("result", [])         if client_data:             from_market = client_data[0].get(FROM_MARKET_FIELD)             if from_market == "1":                 return             user_id = client_data[0].get("ID")     if not user_id:                 contact_data = {             "fields": {                 "NAME": user_data.get("NAME"),                 "LAST_NAME": user_data.get("LAST_NAME"),                 FROM_MARKET_FIELD: "1",                 "EMAIL": [                     {                         "VALUE": user_email,                         "VALUE_TYPE": "WORK"                     }                 ],                 "PHONE": [                     {                         "VALUE": user_data.get("WORK_PHONE"),                         "VALUE_TYPE": "WORK"                     },                     {                         "VALUE": user_data.get("PERSONAL_MOBILE"),                         "VALUE_TYPE": "MOBILE"                     }                 ]             }         }          create_contact = call_method(venrot_instance, "crm.contact.add", contact_data)         if "result" in create_contact:             user_id = create_contact.get("result")     if user_id:         deal_data = {             "fields": {                 "TITLE": f"Установка приложения: {app_name}",                 "CONTACT_IDS": [user_id],                 "OPENED": "N",             }         }         call_method(venrot_instance, "crm.deal.add", deal_data)

Рассмотрим асинхронные функции более детально:

call_api — функция для безопасного вызова API Bitrix24. Находит наш AppInstance по id и передает запрос в call_method.

get_app_info — периодическая задача для проверки статуса приложения. Выполняется через Celery Beat для всех экземпляров приложений и позволяет следить за актуальностью токенов.

send_messages — отвечает за отправку сообщений в открытые линии. Формирует правильную структуру сообщений, пришедших из внешних сервисов.

message_add — публикация сообщений в чате Битрикс. Формирует корректный payload и вызывает метод im.message.add .

create_deal — финальная стадия, автоматическое создание лида в CRM для нового чата.

Вызов api Bitrix осуществляется в crest.py. Благодаря этой функции, нам не нужно каждый раз прописывать эндпоинт и параметры запроса к Битрикс, проверять активность токена и обновлять его, а нужно лишь передать соответствующие данные и метод. Передаю привет принципам DRY 🙂

from urllib.parse import urlparse import logging  import requests from django.db import transaction from django.conf import settings from waweb.tasks import send_message_task from users.models import Message  from .models import AppInstance  logger = logging.getLogger("django")   def call_method(appinstance: AppInstance, b24_method: str, data: dict, attempted_refresh=False, verify=True):     portal = appinstance.portal     endpoint = f"{portal.protocol}://{portal.domain}/rest/"     access_token = appinstance.access_token     print(b24_method)      payload = {"auth": access_token, **data}     print(payload)     try:         response = requests.post(f"{endpoint}{b24_method}", json=payload,                                 allow_redirects=False, timeout=60, verify=verify)         print(response.json())         appinstance.status = response.status_code     except requests.exceptions.SSLError:         if verify:             return call_method(appinstance, b24_method, data, attempted_refresh, verify=False)         else:             raise      if response.status_code == 302 and not attempted_refresh:         new_url = response.headers['Location']         parsed_url = urlparse(new_url)         portal = appinstance.portal         domain = parsed_url.netloc         if portal.domain != domain:             portal.domain = domain             portal.save()         appinstance.attempts = 0         appinstance.save()         return call_method(appinstance, b24_method, data, attempted_refresh=True)      elif response.status_code == 200:         appinstance.attempts = 0         appinstance.save()         return response.json()      else:         appinstance.attempts += 1         appinstance.save()         if response.status_code == 401:             resp = response.json()             error = resp.get("error", "")             error_description = resp.get("error_description", "")             if "REST is available only on commercial plans" in error_description and not appinstance.portal.license_expired:                 appinstance.portal.license_expired = True                 appinstance.portal.save()                 waweb_id = settings.WAWEB_SYTEM_ID                 if waweb_id and appinstance.owner.phone_number:                     try:                         notification = Message.objects.get(code="b24_expired")                         send_message_task(waweb_id, [str(appinstance.owner.phone_number)], notification.message)                     except Message.DoesNotExist:                         pass                 raise Exception("b24 license expired")                          if error == "expired_token" or error == "NO_AUTH_FOUND" and not attempted_refresh:                 refreshed = refresh_token(appinstance)                 if isinstance(refreshed, AppInstance):                     return call_method(appinstance, b24_method, data, attempted_refresh=True)                 else:                     raise Exception(f"Token refresh failed for portal {appinstance.portal.domain}")             else:                 raise Exception(f"Unauthorized error: {response.json()}")          raise Exception(f"Failed to call bitrix: {appinstance.portal.domain} "                         f"status {response.status_code}, response: {response.json()}")   def refresh_token(appinstance: AppInstance):     payload = {         "grant_type": "refresh_token",         "client_id": appinstance.app.client_id,         "client_secret": appinstance.app.client_secret,         "refresh_token": appinstance.refresh_token,     }     response = requests.post("https://oauth.bitrix.info/oauth/token/", data=payload, timeout=60)     print(appinstance.refresh_token)     print(appinstance.access_token)     try:         response_data = response.json()     except Exception:         raise Exception(f"Invalid response while refreshing token for portal {appinstance.portal.domain}")      if response.status_code != 200:         raise Exception(f"Failed to refresh token: {appinstance.portal.domain} {response_data}")      appinstance.access_token = response_data["access_token"]     appinstance.refresh_token = response_data["refresh_token"]     with transaction.atomic():         appinstance.save(update_fields=["access_token", "refresh_token"])     return appinstance 

Создадим вебхуки в views.py (создайте страницу успешной установки install_finish.html)

import uuid import requests from datetime import timedelta  from django.conf import settings from django.contrib import messages from django.shortcuts import render, redirect from django.utils import timezone from django.http import HttpResponse, HttpResponseForbidden from django.views.decorators.csrf import csrf_exempt from rest_framework.authtoken.models import Token  from .crest import call_method from .utils import process_placement from .models import AppInstance, Bitrix, Line, App  from django.contrib.auth import get_user_model, login, logout User = get_user_model()  @csrf_exempt def app_install(request):     if request.method == "HEAD":         return HttpResponse("ok")      app_id = request.GET.get("app-id")     protocol = request.GET.get("PROTOCOL")     domain = request.GET.get("DOMAIN")     data = request.POST      member_id = data.get("member_id")     auth_id = data.get("AUTH_ID")      if not app_id or not member_id or not domain or not auth_id:         return redirect("portals")      try:         app = App.objects.get(id=app_id)     except App.DoesNotExist:         return redirect("portals")      proto = "https" if protocol == "1" else "http"     owner = get_owner(request)     api_key, _ = Token.objects.get_or_create(user=app.owner)      payload = {         "event": "ONAPPINSTALL",         "HANDLER": f"https://{app.site}/api/bitrix/?api-key={api_key.key}&app-id={app_id}",         "auth": auth_id,     }      try:         response = requests.post(f"{proto}://{domain}/rest/event.bind", json=payload, timeout=60)         response.raise_for_status()     except requests.RequestException as e:         resp = response.json()         error_description = resp.get("error_description")         if "Handler already binded" in error_description:             return render(request, "install_finish.html")         else:             return HttpResponse(f"Bitrix event.bind failed {response.status_code, resp}")      return render(request, "install_finish.html")   @csrf_exempt def app_settings(request):     if request.method == "POST":         try:             app_id = request.GET.get("app-id")             data = request.POST             domain = request.GET.get("DOMAIN")             member_id = data.get("member_id")             portal = Bitrix.objects.get(domain=domain, member_id=member_id)         except Exception as e:             return redirect("portals")          placement = data.get("PLACEMENT")         if placement == "SETTING_CONNECTOR":             return process_placement(request)          elif placement == "DEFAULT":             try:                 app = App.objects.get(id=app_id)             except Exception:                 return redirect("portals")             app_url = app.page_url             owner = get_owner(request)              if owner is None:                 logout(request)                 return redirect(app_url)              should_login = not request.user.is_authenticated or request.user != owner             if should_login:                 if request.user.is_authenticated:                     logout(request)                 try:                     login(request, owner, backend='django.contrib.auth.backends.ModelBackend')                 except Exception:                     return redirect(app_url)              AppInstance.objects.filter(portal=portal, owner__isnull=True).update(owner=owner)             Line.objects.filter(portal=portal, owner__isnull=True).update(owner=owner)             return redirect(f"{app_url}?domain={domain}")         else:             return redirect("portals")     elif request.method == "HEAD":         return HttpResponse("ok")     elif request.method == "GET":         return redirect("portals")

А также эндпоинты в urls.py (не забудьте подключить в urls в корне проекта):

from django.urls import path  from .views import app_settings, app_install  urlpatterns = [     path("app-settings/", app_settings, name="app_settings"),     path("app-install/", app_install, name="app_install"), ]

На этом основной функционал модуля Битрикс завершен. Функционал WhatsApp коннектора мы рассмотрим во второй части.


ссылка на оригинал статьи https://habr.com/ru/articles/943596/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *