Привет, мир! Меня зовут Павел, я IT инженер и руководитель службы технической поддержки.
Работая в формате крупного IT-аутсорсинга, мы в компании столкнулись с проблемой: использование общего WhatsApp/Telegram Web, подключённого на компьютерах сотрудников поддержки, оказалось неэффективным. Такой подход не позволял контролировать качество диалогов, а также затруднял перевод обращений клиентов в структурированные тикеты, вследствие чего была начата разработка коннектора к нашему корпоративному порталу Bitrix24.
В данной статье я бы хотел поделиться основами технической реализации и ключевым функционалом коннектора. Детали реализации — например, верстку html страниц, тесты, кастомизацию панели администратора и так далее — оставляю на ваше усмотрение.
Технологический стек
Основу проекта составляют Python и Django, что обусловлено их простотой, гибкостью и широким набором готовых решений. Такой выбор дал возможность быстро реализовать минимально жизнеспособный продукт (MVP) и заложить фундамент для дальнейшего масштабирования.
Также вам понадобится установить redis, celery и настроить их.
Основной функционал Bitrix
Опустим установку Django и подготовку виртуального окружения и приступим к реализации интеграции с Битриксом. Для этого создадим Django приложение bitrix. Начнем с создания моделей в models.py:
import uuid from django.conf import settings from django.contrib.sites.models import Site from django.db import models
Модель для хранения данных о портале:
class Bitrix(models.Model): PROTOCOL_CHOICES = [ ('http', 'HTTP'), ('https', 'HTTPS'), ] protocol = models.CharField(max_length=5, choices=PROTOCOL_CHOICES, default='https') domain = models.CharField(max_length=255) owner = models.ForeignKey( User, on_delete=models.SET_NULL, blank=True, null=True ) user_id = models.CharField(max_length=255, blank=True, null=True) member_id = models.CharField(max_length=255, unique=True, blank=True, null=True) license_expired = models.BooleanField(default=False) def __str__(self): return self.domain
Модель коннектора:
class Connector(models.Model): TYPE_CHOICES = [ ('telegram', 'Telegram Bot'), ('waweb', 'WhatsApp Web'), ] code = models.CharField(max_length=255, default=uuid.uuid4(), unique=True) service = models.CharField(max_length=255, choices=TYPE_CHOICES, blank=True, null=True) name = models.CharField(max_length=255, default="itsource.kg", unique=False) icon = models.FileField(upload_to='connector_icons/', blank=True, null=True, default='connector_icons/cloud-rain-alt.svg') # Заменить на вашу svg иконку def __str__(self): return self.name
Конфигурация приложения Битрикс:
class App(models.Model): id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) owner = models.ForeignKey( settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, blank=True, null=True ) site = models.ForeignKey( Site, on_delete=models.SET_NULL, related_name="apps", blank=True, null=True ) name = models.CharField(max_length=255, blank=True, unique=False) page_url = models.CharField(max_length=255, blank=True, default="/") connectors = models.ManyToManyField(Connector, blank=True, related_name='apps') asterisk = models.BooleanField(default=False, help_text="Chek for Asterisk connector") client_id = models.CharField(max_length=255, blank=True, unique=False) client_secret = models.CharField(max_length=255, blank=True) def __str__(self): return self.name
Модель для хранения сущностей локальных приложений и открытых линий:
class AppInstance(models.Model): id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) owner = models.ForeignKey( settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, blank=True, null=True ) app = models.ForeignKey(App, on_delete=models.SET_NULL, related_name="installations", blank=True, null=True) portal = models.ForeignKey( Bitrix, on_delete=models.CASCADE, related_name="installations", blank=True, null=True ) auth_status = models.CharField(max_length=1) application_token = models.CharField(max_length=255, blank=True) storage_id = models.CharField(max_length=255, blank=True) status = models.IntegerField(default=0, blank=True) attempts = models.IntegerField(default=0, blank=True) access_token = models.CharField(max_length=255, blank=True, null=True, editable=False) refresh_token = models.CharField(max_length=255, blank=True, null=True, editable=False) def __str__(self): return f"{self.app.name} on {self.portal.domain}" class Line(models.Model): line_id = models.CharField(max_length=50) owner = models.ForeignKey( settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, blank=True, null=True ) app_instance = models.ForeignKey(AppInstance, on_delete=models.CASCADE, related_name="lines", null=True) connector = models.ForeignKey(Connector, on_delete=models.SET_NULL, related_name="lines", null=True) portal = models.ForeignKey(Bitrix, on_delete=models.CASCADE, related_name="lines", blank=True, null=True) def __str__(self): return f"Line {self.line_id}"
Я приверженец модульной архитектуры, поэтому создадим отдельный модуль api внутри приложения bitrix для реализации api вебхуков
api/serializers.py
from rest_framework import serializers from bitrix.models import Bitrix class PortalSerializer(serializers.ModelSerializer): class Meta: model = Bitrix fields = [ "owner", "user_id", "domain", ] def create(self, validated_data): return Bitrix.objects.create(**validated_data)
api/views.py
from rest_framework.mixins import CreateModelMixin from rest_framework.renderers import JSONRenderer from rest_framework.viewsets import GenericViewSet from rest_framework.response import Response from bitrix.models import Bitrix from bitrix.utils import event_processor from .serializers import PortalSerializer class PortalViewSet(CreateModelMixin, GenericViewSet): queryset = Bitrix.objects.all() serializer_class = PortalSerializer def create(self, request, *args, **kwargs): print("create func") return event_processor(request) def head(self, request, *args, **kwargs): print("head func") return Response(headers={'Allow': 'POST, HEAD'})
В settings.py в приложении с ядром проекта включим авторизацию по токенам
REST_FRAMEWORK = { "DEFAULT_AUTHENTICATION_CLASSES": ( "rest_framework.authentication.SessionAuthentication", "rest_framework.authentication.TokenAuthentication", "core.qpta.QueryParamTokenAuthentication", ), "DEFAULT_PERMISSION_CLASSES": ("rest_framework.permissions.IsAuthenticated",), 'DEFAULT_RENDERER_CLASSES': ('rest_framework.renderers.JSONRenderer',) }
Там же создадим qpta.py:
from rest_framework.authentication import TokenAuthentication class QueryParamTokenAuthentication(TokenAuthentication): def authenticate(self, request): # Try to get the token from the URL query parameter token = request.query_params.get("api-key") if not token: # Fall back to default token authentication return super().authenticate(request) # Authenticate the token manually user, token = self.authenticate_credentials(token) return (user, token)
И api_router.py:
from django.conf import settings from rest_framework.routers import DefaultRouter from rest_framework.routers import SimpleRouter from bitrix.api.views import PortalViewSet from users.api.views import UserViewSet from waweb.api.views import EventsHandler from telegram.api.views import TelegramEventsHandler router = DefaultRouter() if settings.DEBUG else SimpleRouter() router.register("users", UserViewSet) router.register("bitrix", PortalViewSet) router.register("waweb", EventsHandler, basename="waevents") router.register("telegram", TelegramEventsHandler, basename="tgevents") app_name = "api" urlpatterns = router.urls
В urls.py добавим:
urlpatterns += [ path("api/", include("core.api_router")), # core заменить на название приложения где лежит ядро проекта ]
Создадим файл utils.py с основными функциями:
import base64 import json import logging import re import redis import requests from django.core.exceptions import ObjectDoesNotExist from django.db import transaction from django.contrib import messages from django.conf import settings from django.shortcuts import redirect, get_object_or_404 from rest_framework import status from rest_framework.authtoken.models import Token from rest_framework.response import Response from django.http import HttpResponse from waweb.models import Session import waweb.utils as waweb import waweb.tasks as waweb_tasks import telegram.tasks as telegram_tasks from .crest import call_method from .models import App, AppInstance, Bitrix, Line, Connector import bitrix.tasks as bitrix_tasks from telegram.models import TelegramBot redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=6379, db=0) logger = logging.getLogger("django") GENERAL_EVENTS = [ "ONAPPUNINSTALL", ] CONNECTOR_EVENTS = [ "ONIMCONNECTORMESSAGEADD", "ONIMCONNECTORLINEDELETE", "ONIMCONNECTORSTATUSDELETE", ]
Функция для подключения открытой линии разделяется на 2 логических блока: создание и подключение уже существующей линии. Если id линии начинается с create, то создаем новую открытую линию и AppInstance. Далее отправляем в Битрикс конфиги открытой линии (imopenlines.config.add), проверяем AppInstance на наличие подключенной открытой линии и деактивируем ее, если таковая имеется. Финальным шагом активируем нашу открытую линию в Битриксе методом imconnector.activate и возвращаем редирект на страницу завершения подключения:
def connect_line(request, line_id, entity, connector, redirect_to): line_id = str(line_id) if line_id.startswith("create__"): instance_id = line_id.split("__")[1] app_instance = get_object_or_404(AppInstance, id=instance_id, owner=request.user) if not app_instance.portal: messages.error(request, "Невозможно создать линию: портал не найден") return redirect(redirect_to) if entity.line: call_method(app_instance, "imconnector.activate", { "CONNECTOR": connector.code, "LINE": entity.line.line_id, "ACTIVE": 0, }) if connector.service == "telegram": line_name = entity.bot_username else: line_name = entity.phone create_payload = {"PARAMS": {"LINE_NAME": line_name}} result = call_method(app_instance, "imopenlines.config.add", create_payload) if result and result.get("result"): new_line_id = result["result"] line = Line.objects.create( line_id=new_line_id, portal=app_instance.portal, connector=connector, app_instance=app_instance, owner=request.user ) entity.line = line entity.app_instance = app_instance entity.save() activate_payload = { "CONNECTOR": connector.code, "LINE": new_line_id, "ACTIVE": 1, } call_method(app_instance, "imconnector.activate", activate_payload) messages.success(request, f"Создана и подключена линия {new_line_id}") else: messages.error(request, f"Ошибка при создании линии: {result}") return redirect(redirect_to) else: line = get_object_or_404(Line, id=line_id) if not line: messages.error(request, f"Линия {line_id} не найдена") return redirect(redirect_to) entity_model = type(entity) usage_count = entity_model.objects.filter(line=line).exclude(pk=entity.pk).count() if usage_count > 0: messages.error(request, "Эта линия уже используется.") return redirect(redirect_to) app_instance = line.app_instance if entity.line == line: messages.success(request, "Выбрана та же линия") return redirect(redirect_to) if entity.line: call_method(app_instance, "imconnector.activate", { "CONNECTOR": connector.code, "LINE": entity.line.line_id, "ACTIVE": 0, }) response = call_method(app_instance, "imconnector.activate", { "CONNECTOR": connector.code, "LINE": line.line_id, "ACTIVE": 1, }) if response.get("result"): entity.line = line entity.app_instance = app_instance entity.save() messages.success(request, "Линия подключена") messages.success(request, "Настройки обновлены") return redirect(redirect_to)
Подписка на события Битрикс представляет из себя всего 1 запрос на эндпоинт event.bind, в котором мы сообщаем адрес нашего вебхука, принимающего события:
def events_bind(events: dict, appinstance: AppInstance, api_key: str): url = appinstance.app.site for event in events: payload = { "event": event, "HANDLER": f"https://{url}/api/bitrix/?api-key={api_key}", } try: return call_method(appinstance, "event.bind", payload) except (ObjectDoesNotExist, Exception) as exc: print(exc, " in events_bind") return None
Регистрация коннектора включает в себя 2 функции: регистрация коннектора (register_connector) и обработка его установки (process_placement).
Функция регистрации коннектора добавляет его в меню битрикса CRM — Контакт-Центр. Здесь мы декодируем нашу svg иконку в base64 и вызываем метод imconnector.register с такими параметрами, как uuid коннектора, название коннектора и его иконка в base64. После вызова этой функции, карточка коннектора появится в Контакт-Центре.
Что касается обработчика установки, его следует воспринимать как входящий вебхук от битрикса. Здесь мы получаем параметры коннектора и открытой линии, uuid приложения и адрес портала Битрикс. Если данные корректны, привязываем Открытую Линию к сущности локального приложения коннектора и пользователю и передаем HANDLER для событий, связанных с сообщениями:
def register_connector(appinstance: AppInstance, api_key: str, connector): url = appinstance.app.site if not connector.icon: return None try: with open(connector.icon.path, "rb") as file: image_data = file.read() encoded_image = base64.b64encode(image_data).decode("utf-8") connector_logo = f"data:image/svg+xml;base64,{encoded_image}" payload = { "ID": connector.code, "NAME": connector.name, "ICON": { "DATA_IMAGE": connector_logo, }, "PLACEMENT_HANDLER": f"https://{url}/app-settings/?inst={appinstance.id}", } try: return call_method(appinstance, "imconnector.register", payload) except (ObjectDoesNotExist, Exception) as exc: print(exc, " in register_connector") return None events_bind(CONNECTOR_EVENTS, appinstance, api_key) except FileNotFoundError: return None except Exception as e: return None def process_placement(request): try: data = request.POST placement_options = data.get("PLACEMENT_OPTIONS") instance_id = request.GET.get("inst") domain = request.GET.get("DOMAIN") print("process placement GET ", request.GET) print("FULL URL:", request.build_absolute_uri()) placement_options = json.loads(placement_options) line_id = placement_options.get("LINE") connector_code = placement_options.get("CONNECTOR") app_instance = AppInstance.objects.filter(id=instance_id).first() if not app_instance: return HttpResponse("app not found") portal = Bitrix.objects.filter(domain=domain).first() if not portal: return HttpResponse("bitrix not found") connector = Connector.objects.filter(code=connector_code).first() if not connector: return HttpResponse("connector not found") line, created = Line.objects.get_or_create( line_id=line_id, portal=portal, connector=connector, app_instance=app_instance, owner=app_instance.owner ) activate_payload = { "CONNECTOR": connector.code, "LINE": line_id, "ACTIVE": 1, } print("activate_payload= ", activate_payload) activate_result = call_method(app_instance, "imconnector.activate", activate_payload) if not activate_result or not activate_result.get("result"): return HttpResponse(f"Failed to activate connector: {activate_result}") # api_key = request.GET.get("api-key") api_key = Token.objects.get(user=app_instance.owner).key print("request data= ", request.GET) print("api_key=", api_key) print("app_instance=", app_instance) events = ["ONIMCONNECTORMESSAGEADD", "ONIMCONNECTORSTATUSDELETE", "ONIMCONNECTORLINEDELETE"] for event in events: event_payload = { "event": event, "HANDLER": f"https://{app_instance.app.site}/api/bitrix/?api-key={api_key}", } call_method(app_instance, "event.bind", event_payload) return HttpResponse( f"Линия успешно создана и активирована. Настройте доп. параметры в Bitrix: https://{app_instance.app.site}/portals/" ) except Exception as e: logger.error(f"Unexpected error: {e}") return HttpResponse({"An unexpected error occurred"})
Обработка файлов и событий. extract_files и upload_file описывают логику работы коннектора с файлами. Ничего сверхъестественного, поэтому предлагаю не задерживаться на этом и перейдем к event_processor — функции, обрабатывающей все входящие события от Битрикса.
def extract_files(data): files = [] i = 0 while True: name_key = f"data[MESSAGES][0][message][files][{i}][name]" link_key = f"data[MESSAGES][0][message][files][{i}][link]" type_key = f"data[MESSAGES][0][message][files][{i}][type]" if name_key in data and link_key in data: files.append( { "name": data.get(name_key), "link": data.get(link_key), "type": data.get(type_key), }, ) i += 1 else: break return files def upload_file(appinstance, storage_id, fileContent, filename): payload = { "id": storage_id, "fileContent": fileContent, "data": {"NAME": filename}, "generateUniqueName": True, } upload_to_bitrix = call_method(appinstance, "disk.folder.uploadfile", payload) if "result" in upload_to_bitrix: return upload_to_bitrix["result"] else: return None
event_processor — это единый обработчик всех событий, поступающих от Bitrix24 по REST-хукам. Он принимает входящие запросы, извлекает из них данные (события, токены, параметры портала) и в зависимости от типа события выполняет соответствующую бизнес-логику:
-
регистрация приложения и портала при установке,
-
обработка новых сообщений от пользователей,
-
отключение или удаление линии,
-
корректное завершение работы при удалении приложения.
Рассмотрим каждое событие более детально.
Обработка установки приложения (ONAPPINSTALL)
Когда пользователь впервые устанавливает приложение в свой портал Bitrix24, прилетает событие ONAPPINSTALL.
Здесь функция:
-
Проверяет, существует ли экземпляр приложения (AppInstance).
-
Если нет — создаёт новый портал (Bitrix) и привязывает к нему приложение.
-
Сохраняет выданные Bitrix24 токены (access_token, refresh_token, application_token).
-
Регистрирует события (event.bind) и подключает доступные коннекторы (например, WhatsApp или Telegram).
-
При необходимости создаёт хранилище в Bitrix Диске (disk.storage.getforapp).
-
Возвращает сообщение об успешной установке.
Таким образом, уже на этапе установки приложение готово к полноценной работе.
Обработка входящих сообщений (ONIMCONNECTORMESSAGEADD)
Одно из самых частых событий — это новое сообщение от клиента, пришедшее в открытую линию.
Алгоритм работы:
-
Определяется, какой коннектор сработал (WhatsApp Web или Telegram).
-
Извлекаются данные о линии (LINE), сообщении (MESSAGE_ID, CHAT_ID) и его содержимом.
-
Bitrix получает подтверждение о доставке через метод imconnector.send.status.delivery.
-
Система проверяет, не было ли это сообщение отправлено самой интеграцией (защита от «петли» через Redis).
-
Если есть вложения — они извлекаются.
-
В зависимости от коннектора сообщение пробрасывается в WhatsApp или Telegram:
-
для WhatsApp — через waweb-сессию,
-
для Telegram — через соответствующего бота.
-
-
Если отправка успешна — сообщение сохраняется в базе.
Таким образом, коннектор становится полноценным «мостом» между Bitrix24 и внешним мессенджером.
Отключение линии (ONIMCONNECTORSTATUSDELETE)
Когда в Bitrix24 отключается линия, событие ONIMCONNECTORSTATUSDELETE уведомляет приложение о том, что коннектор больше неактивен.
Здесь происходит:
-
поиск линии по ID,
-
отвязка связанного ресурса (например, телефона в WhatsApp),
-
возврат ответа о том, что линия корректно отключена.
Удаление линии (ONIMCONNECTORLINEDELETE)
Если линия полностью удаляется в Bitrix24, приложение должно синхронизировать изменения.
Обработчик:
-
находит линию в базе,
-
удаляет её,
-
возвращает статус операции.
Удаление приложения (ONAPPUNINSTALL)
При удалении приложения из Bitrix24 (ONAPPUNINSTALL):
-
Удаляется AppInstance (экземпляр приложения для конкретного портала).
-
Если у портала больше нет ни одного экземпляра приложения — опционально удаляется и сам портал.
def event_processor(request): try: data = request.data event = data.get("event") domain = data.get("auth[domain]") user_id = data.get("auth[user_id]") auth_status = data.get("auth[status]") access_token = data.get("auth[access_token]") refresh_token = data.get("auth[refresh_token]") application_token = data.get("auth[application_token]") member_id = data.get("auth[member_id]") api_key = request.query_params.get("api-key") app_id = request.query_params.get("app-id") print(data) try: if not api_key: raise ValueError("API key is required") appinstance = AppInstance.objects.get(application_token=application_token) appinstance.access_token = access_token appinstance.refresh_token = refresh_token appinstance.save() if not appinstance.portal.member_id: appinstance.portal.member_id = member_id appinstance.portal.save() except AppInstance.DoesNotExist: if event == "ONAPPINSTALL": try: app = App.objects.get(id=app_id) except App.DoesNotExist: return Response({"message": "App not found."}) portal, created = Bitrix.objects.get_or_create( member_id=member_id, defaults={ "domain": domain, "user_id": user_id, "owner": request.user if auth_status == "L" else None, } ) appinstance_owner = ( portal.owner if portal.owner else (request.user if auth_status == "L" else None) ) appinstance, created = AppInstance.objects.update_or_create( app=app, portal=portal, owner=appinstance_owner, defaults={ "auth_status": auth_status, "access_token": access_token, "refresh_token": refresh_token, "application_token": application_token, } ) storage_data = call_method(appinstance, "disk.storage.getforapp", {}) if "result" in storage_data: storage_id = storage_data["result"]["ID"] appinstance.storage_id = storage_id appinstance.save() # Регистрация коннектора/ подписка на события def register_events_and_connectors(): events_bind(GENERAL_EVENTS, appinstance, api_key) if app.connectors.exists(): for connector in app.connectors.all(): register_connector(appinstance, api_key, connector) transaction.on_commit(register_events_and_connectors) if VENDOR_BITRIX_INSTANCE: bitrix_tasks.create_deal(appinstance.id, VENDOR_BITRIX_INSTANCE, app.name) if portal.owner: return Response('App successfully created and linked') return Response( {"message": "App and portal successfully created and linked."}, status=status.HTTP_201_CREATED, ) else: return Response({"message": "App not found and not an install event."}) # Обработка события ONIMCONNECTORMESSAGEADD if event == "ONIMCONNECTORMESSAGEADD": connector_code = data.get("data[CONNECTOR]") connector = get_object_or_404(Connector, code=connector_code) if not connector: return Response({'Connector not found'}) line_id = data.get("data[LINE]") message_id = data.get("data[MESSAGES][0][im][message_id]") chat_id = data.get("data[MESSAGES][0][im][chat_id]") chat = data.get("data[MESSAGES][0][chat][id]") status_data = { "CONNECTOR": connector_code, "LINE": line_id, "MESSAGES": [ { "im": { "chat_id": chat_id, "message_id": message_id, }, }, ], } call_method(appinstance, "imconnector.send.status.delivery", status_data) # Проверяем наличие сообщения в редис (отправлено из других сервисов ) if redis_client.exists(f'bitrix:{domain}:{message_id}'): return Response({'message': 'loop message'}) file_type = data.get("data[MESSAGES][0][message][files][0][type]", None) text = data.get("data[MESSAGES][0][message][text]", None) if text: text = re.sub(r"\[(?!(br|\n))[^\]]+\]", "", text) text = text.replace("[br]", "\n") files = [] print(file_type) if file_type: files = extract_files(data) if connector.service == "waweb": try: line = Line.objects.get(line_id=line_id, app_instance=appinstance) wa = Session.objects.get(line=line) if files: for file in files: waweb_tasks.send_message_task(str(wa.session), [chat], file, 'media') resp = waweb.send_message(wa.session, chat, text) if resp.status_code == 201: waweb.store_msg(resp) except Exception as e: print(f'Failed to send waweb message: {str(e)}') return Response({'error': f'Failed to send message: {str(e)}'}) if connector.service == "telegram": try: # Получаем линию line = Line.objects.get(line_id=line_id, app_instance=appinstance) # Получаем бота bot = TelegramBot.objects.get(line=line) # Сначала отправляем медиафайлы, если есть if files: for file in files: telegram_tasks.send_telegram_message_task( bot_token=bot.bot_token, recipient=chat, content=file, cont_type="media" ) # Отправляем текстовое сообщение if text: telegram_tasks.send_telegram_message_task( bot_token=bot.bot_token, recipient=chat, content=text, cont_type="string" ) except Exception as e: print(f'Failed to send Telegram message: {str(e)}') return Response({'error': f'Failed to send message: {str(e)}'}) return Response( {"status": "ONIMCONNECTORMESSAGEADD event processed"}, status=status.HTTP_200_OK, ) elif event == "ONIMCONNECTORSTATUSDELETE": line_id = data.get("data[line]") connector_code = data.get("data[connector]") connector = get_object_or_404(Connector, code=connector_code) if not connector: return Response({'Connector not found'}) try: line = Line.objects.get(line_id=line_id, app_instance=appinstance) if connector.service == "waweb": phone = line.wawebs.first() if phone: phone.line = None phone.save() return Response("Line disconnected") except Line.DoesNotExist: return Response( {"status": "Line not found"}, status=status.HTTP_200_OK, ) elif event == "ONIMCONNECTORLINEDELETE": line_id = data.get("data") try: line = Line.objects.filter(line_id=line_id, app_instance=appinstance).first() if line: line.delete() return Response({"status": "Line deleted"}, status=status.HTTP_200_OK) except Line.DoesNotExist: return Response( {"status": "Line not found"}, status=status.HTTP_200_OK ) elif event == "ONAPPUNINSTALL": portal = appinstance.portal appinstance.delete() if not AppInstance.objects.filter(portal=portal).exists(): # portal.delete() return Response(f"{appinstance} and associated portal deleted") else: return Response(f"{appinstance} deleted") else: return Response('Unsupported event') except Exception as e: logger.error(f"Error occurred: {e!s}") return Response( {"error": "Internal server error"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, )
Для асинхронных запросов создадим tasks.py:
import redis from celery import shared_task import logging from django.core.exceptions import ObjectDoesNotExist from django.conf import settings from .crest import call_method from .models import AppInstance logger = logging.getLogger("django") redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=6379, db=0) FROM_MARKET_FIELD = settings.FROM_MARKET_FIELD @shared_task(bind=True, max_retries=5, default_retry_delay=5) def call_api(self, id, method, payload): try: appinstance = AppInstance.objects.get(id=id) return call_method(appinstance, method, payload) except (ObjectDoesNotExist, Exception) as exc: raise self.retry(exc=exc) @shared_task def get_app_info(): app_instances = AppInstance.objects.all() for app_instance in app_instances: if app_instance.attempts < settings.BITRIX_CHECK_APP_ATTEMTS: call_api(app_instance.id, "app.info", {}) @shared_task(bind=True, max_retries=5, default_retry_delay=5) def send_messages(self, app_instance_id, user_phone, text, connector, line, sms=False, pushName=None, message_id=None, attachments=None, profilepic_url=None, chat_id=None, chat_url=None, user_id=None): init_message = "System: initiation message." try: if not app_instance_id: raise ValueError("app_instance_id is required and cannot be None or empty") app_instance = AppInstance.objects.get(id=app_instance_id) print(pushName) bitrix_msg = { "CONNECTOR": connector, "LINE": line, "MESSAGES": [ { "user": { "phone": user_phone, "name": pushName or user_phone, "id": user_id or user_phone, "skip_phone_validate": 'Y', # "picture": { # "url": profilepic_url # } }, "chat": { "id": chat_id or user_phone, "url": chat_url }, "message": { "text": text, "id": message_id, "files": attachments or [] } } ], } resp = call_method(app_instance, "imconnector.send.messages", bitrix_msg) result = resp.get("result", {}) results = result.get("DATA", {}).get("RESULT", []) for result_item in results: chat_session = result_item.get("session", {}) if chat_session: domain = app_instance.portal.domain chat_id = chat_session.get("CHAT_ID") identity = user_id or user_phone redis_client.set(f"bitrix_chat:{domain}:{line}:{identity}", chat_id) return resp except Exception as e: raise e @shared_task(bind=True, max_retries=5, default_retry_delay=5) def message_add(self, app_instance_id, line_id, user_phone, text, connector): try: app_instance = AppInstance.objects.get(id=app_instance_id) except AppInstance.DoesNotExist: logger.error(f"AppInstance {app_instance_id} does not exist") raise domain = app_instance.portal.domain chat_key = f'bitrix_chat:{domain}:{line_id}:{user_phone}' if redis_client.exists(chat_key): chat_id = redis_client.get(chat_key).decode('utf-8') payload = { "DIALOG_ID": f"chat{chat_id}", "MESSAGE": text, } max_send_attempts = 3 for attempt in range(max_send_attempts): try: resp = call_method(app_instance, "im.message.add", payload) message_id = resp.get("result") print(message_id) redis_client.setex(f'bitrix:{domain}:{message_id}', 600, message_id) payload_status = { "CONNECTOR": connector, "LINE": line_id, "MESSAGES": [{ "im": { "chat_id": str(chat_id), "message_id": str(message_id) } }] } delivery_status_resp = call_method(app_instance, "imconnector.send.status.delivery", payload_status) print(delivery_status_resp) return resp except Exception as e: if attempt >= max_send_attempts - 1: logger.error(f"Exception occurred while sending message: {e}") raise else: self.retry(exc=e) send_messages(app_instance_id, user_phone, text, connector, line_id, True) @shared_task def create_deal(app_instance_id, vendor_inst_id, app_name): app_instance = AppInstance.objects.get(id=app_instance_id) try: user_current = call_method(app_instance, "user.current", {}) user_data = user_current.get("result", {}) user_email = user_data.get("EMAIL") except Exception as e: return if not user_email: return user_id = None venrot_instance = AppInstance.objects.get(id=vendor_inst_id) # Поиск контакта в битрикс payload = { "FILTER": { "EMAIL": user_email }, "select": [FROM_MARKET_FIELD] } client_data = call_method(venrot_instance, "crm.contact.list", payload) if "result" in client_data: client_data = client_data.get("result", []) if client_data: from_market = client_data[0].get(FROM_MARKET_FIELD) if from_market == "1": return user_id = client_data[0].get("ID") if not user_id: contact_data = { "fields": { "NAME": user_data.get("NAME"), "LAST_NAME": user_data.get("LAST_NAME"), FROM_MARKET_FIELD: "1", "EMAIL": [ { "VALUE": user_email, "VALUE_TYPE": "WORK" } ], "PHONE": [ { "VALUE": user_data.get("WORK_PHONE"), "VALUE_TYPE": "WORK" }, { "VALUE": user_data.get("PERSONAL_MOBILE"), "VALUE_TYPE": "MOBILE" } ] } } create_contact = call_method(venrot_instance, "crm.contact.add", contact_data) if "result" in create_contact: user_id = create_contact.get("result") if user_id: deal_data = { "fields": { "TITLE": f"Установка приложения: {app_name}", "CONTACT_IDS": [user_id], "OPENED": "N", } } call_method(venrot_instance, "crm.deal.add", deal_data)
Рассмотрим асинхронные функции более детально:
call_api — функция для безопасного вызова API Bitrix24. Находит наш AppInstance по id и передает запрос в call_method.
get_app_info — периодическая задача для проверки статуса приложения. Выполняется через Celery Beat для всех экземпляров приложений и позволяет следить за актуальностью токенов.
send_messages — отвечает за отправку сообщений в открытые линии. Формирует правильную структуру сообщений, пришедших из внешних сервисов.
message_add — публикация сообщений в чате Битрикс. Формирует корректный payload и вызывает метод im.message.add .
create_deal — финальная стадия, автоматическое создание лида в CRM для нового чата.
Вызов api Bitrix осуществляется в crest.py. Благодаря этой функции, нам не нужно каждый раз прописывать эндпоинт и параметры запроса к Битрикс, проверять активность токена и обновлять его, а нужно лишь передать соответствующие данные и метод. Передаю привет принципам DRY 🙂
from urllib.parse import urlparse import logging import requests from django.db import transaction from django.conf import settings from waweb.tasks import send_message_task from users.models import Message from .models import AppInstance logger = logging.getLogger("django") def call_method(appinstance: AppInstance, b24_method: str, data: dict, attempted_refresh=False, verify=True): portal = appinstance.portal endpoint = f"{portal.protocol}://{portal.domain}/rest/" access_token = appinstance.access_token print(b24_method) payload = {"auth": access_token, **data} print(payload) try: response = requests.post(f"{endpoint}{b24_method}", json=payload, allow_redirects=False, timeout=60, verify=verify) print(response.json()) appinstance.status = response.status_code except requests.exceptions.SSLError: if verify: return call_method(appinstance, b24_method, data, attempted_refresh, verify=False) else: raise if response.status_code == 302 and not attempted_refresh: new_url = response.headers['Location'] parsed_url = urlparse(new_url) portal = appinstance.portal domain = parsed_url.netloc if portal.domain != domain: portal.domain = domain portal.save() appinstance.attempts = 0 appinstance.save() return call_method(appinstance, b24_method, data, attempted_refresh=True) elif response.status_code == 200: appinstance.attempts = 0 appinstance.save() return response.json() else: appinstance.attempts += 1 appinstance.save() if response.status_code == 401: resp = response.json() error = resp.get("error", "") error_description = resp.get("error_description", "") if "REST is available only on commercial plans" in error_description and not appinstance.portal.license_expired: appinstance.portal.license_expired = True appinstance.portal.save() waweb_id = settings.WAWEB_SYTEM_ID if waweb_id and appinstance.owner.phone_number: try: notification = Message.objects.get(code="b24_expired") send_message_task(waweb_id, [str(appinstance.owner.phone_number)], notification.message) except Message.DoesNotExist: pass raise Exception("b24 license expired") if error == "expired_token" or error == "NO_AUTH_FOUND" and not attempted_refresh: refreshed = refresh_token(appinstance) if isinstance(refreshed, AppInstance): return call_method(appinstance, b24_method, data, attempted_refresh=True) else: raise Exception(f"Token refresh failed for portal {appinstance.portal.domain}") else: raise Exception(f"Unauthorized error: {response.json()}") raise Exception(f"Failed to call bitrix: {appinstance.portal.domain} " f"status {response.status_code}, response: {response.json()}") def refresh_token(appinstance: AppInstance): payload = { "grant_type": "refresh_token", "client_id": appinstance.app.client_id, "client_secret": appinstance.app.client_secret, "refresh_token": appinstance.refresh_token, } response = requests.post("https://oauth.bitrix.info/oauth/token/", data=payload, timeout=60) print(appinstance.refresh_token) print(appinstance.access_token) try: response_data = response.json() except Exception: raise Exception(f"Invalid response while refreshing token for portal {appinstance.portal.domain}") if response.status_code != 200: raise Exception(f"Failed to refresh token: {appinstance.portal.domain} {response_data}") appinstance.access_token = response_data["access_token"] appinstance.refresh_token = response_data["refresh_token"] with transaction.atomic(): appinstance.save(update_fields=["access_token", "refresh_token"]) return appinstance
Создадим вебхуки в views.py (создайте страницу успешной установки install_finish.html)
import uuid import requests from datetime import timedelta from django.conf import settings from django.contrib import messages from django.shortcuts import render, redirect from django.utils import timezone from django.http import HttpResponse, HttpResponseForbidden from django.views.decorators.csrf import csrf_exempt from rest_framework.authtoken.models import Token from .crest import call_method from .utils import process_placement from .models import AppInstance, Bitrix, Line, App from django.contrib.auth import get_user_model, login, logout User = get_user_model() @csrf_exempt def app_install(request): if request.method == "HEAD": return HttpResponse("ok") app_id = request.GET.get("app-id") protocol = request.GET.get("PROTOCOL") domain = request.GET.get("DOMAIN") data = request.POST member_id = data.get("member_id") auth_id = data.get("AUTH_ID") if not app_id or not member_id or not domain or not auth_id: return redirect("portals") try: app = App.objects.get(id=app_id) except App.DoesNotExist: return redirect("portals") proto = "https" if protocol == "1" else "http" owner = get_owner(request) api_key, _ = Token.objects.get_or_create(user=app.owner) payload = { "event": "ONAPPINSTALL", "HANDLER": f"https://{app.site}/api/bitrix/?api-key={api_key.key}&app-id={app_id}", "auth": auth_id, } try: response = requests.post(f"{proto}://{domain}/rest/event.bind", json=payload, timeout=60) response.raise_for_status() except requests.RequestException as e: resp = response.json() error_description = resp.get("error_description") if "Handler already binded" in error_description: return render(request, "install_finish.html") else: return HttpResponse(f"Bitrix event.bind failed {response.status_code, resp}") return render(request, "install_finish.html") @csrf_exempt def app_settings(request): if request.method == "POST": try: app_id = request.GET.get("app-id") data = request.POST domain = request.GET.get("DOMAIN") member_id = data.get("member_id") portal = Bitrix.objects.get(domain=domain, member_id=member_id) except Exception as e: return redirect("portals") placement = data.get("PLACEMENT") if placement == "SETTING_CONNECTOR": return process_placement(request) elif placement == "DEFAULT": try: app = App.objects.get(id=app_id) except Exception: return redirect("portals") app_url = app.page_url owner = get_owner(request) if owner is None: logout(request) return redirect(app_url) should_login = not request.user.is_authenticated or request.user != owner if should_login: if request.user.is_authenticated: logout(request) try: login(request, owner, backend='django.contrib.auth.backends.ModelBackend') except Exception: return redirect(app_url) AppInstance.objects.filter(portal=portal, owner__isnull=True).update(owner=owner) Line.objects.filter(portal=portal, owner__isnull=True).update(owner=owner) return redirect(f"{app_url}?domain={domain}") else: return redirect("portals") elif request.method == "HEAD": return HttpResponse("ok") elif request.method == "GET": return redirect("portals")
А также эндпоинты в urls.py (не забудьте подключить в urls в корне проекта):
from django.urls import path from .views import app_settings, app_install urlpatterns = [ path("app-settings/", app_settings, name="app_settings"), path("app-install/", app_install, name="app_install"), ]
На этом основной функционал модуля Битрикс завершен. Функционал WhatsApp коннектора мы рассмотрим во второй части.
ссылка на оригинал статьи https://habr.com/ru/articles/943596/
Добавить комментарий