Как построить асинхронное Python-приложение для рассылки уведомлений о событии

от автора

В этой статье рассмотрим создание асинхронного приложения на Python с использованием библиотеки httpx для рассылки уведомлений пользователям о предстоящих событиях, на которые они зарегистрировались.

Приложение будет запускаться раз в сутки с помощью планировщика Cron на Linux сервере. Для отправки SMS-уведомлений воспользуемся платформой МТС Exolve.

Зачем это нужно

Рассылка уведомлений пользователям — важная задача во многих сервисах. Она позволяет информировать пользователей о важных обновлениях, новостях или других интересных им событиях. При этом SMS — это инструмент с гарантированной доставкой и высокой открываемостью, сообщения дойдут даже до пользователей с отключенным интернетом.

Асинхронное приложение на Python с использованием библиотеки httpx позволяет эффективно реализовать такую рассылку, обеспечивая высокую производительность и отзывчивость системы.

Почему асинхронно

В нашем случае, использование асинхронного подхода позволяет приложению эффективно обрабатывать большое количество запросов на рассылку уведомлений, минимизируя задержки и обеспечивая отзывчивость системы.

Пример асинхронной реализации клиента на httpx

Рассмотрим пример реализации отправки SMS-уведомлений через приложение на языке python. Для начала, убедитесь, что у вас установлены все необходимые зависимости. Мы используем httpx для создания приложения и сопутствующую библиотеку asyncio.

Проект будет со следующей структурой:

aclient_exmp/     /venv     /example_db         __init__.py         handle_data.py         info_db.py     /apimodul         __init__.py         mtt_client.py    /helper         __init__.py         decorators.py     logging.yaml     config.py     main.py     dev.env

В файле dev.env хранятся переменные окружения: API-ключ, URL-адрес запроса к МТС Exolve и номер телефона для отправки SMS. 

В файле config.py получим данные из переменных окружения.

from dotenv import dotenv_values  info_env = dotenv_values('dev.env')  API_KEY = info_env.get('API_KEY') PHONE_SEND = info_env.get('PHONE_SEND') BASE_URL = info_env.get('BASE_URL')

Логирование

Добавим логирование к нашему проекту. Файл logging.yaml представляет собой файл конфигурации, в котором определяются форматирование, обработчики и уровни логирования для различных логгеров в проекте. В данном примере файла logging.yaml мы определяем форматирование для логов, создаём обработчик для вывода логов в консоль и настраиваем корневой логгер для уровня DEBUG.

version: 1  formatters:   info:     format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'  handlers:   console_handler:     class: logging.StreamHandler     level: INFO     formatter: info     stream: ext://sys.stdout  root:   level: DEBUG   handlers: [console_handler]

Дополним код в файле config.py, который выполнит загрузку конфигурации логирования из файла logging.yaml и настройку логгеров в проекте.

import logging.config import os import sys  import yaml from dotenv import dotenv_values   info_env = dotenv_values('dev.env')  API_KEY = info_env.get('API_KEY') PHONE_SEND = info_env.get('PHONE_SEND') BASE_URL=f"{info_env.get('BASE_URL')}"    CURRENT_FILE_PATH = os.path.abspath(__file__) BASE_DIR = os.path.dirname(CURRENT_FILE_PATH) LOGGING_CONF = BASE_DIR + '/logging.yaml'  if os.path.isfile(LOGGING_CONF) and os.access(LOGGING_CONF, os.R_OK): _lc_stream = open(LOGGING_CONF, 'r') _lc_conf = yaml.load(_lc_stream, Loader=yaml.FullLoader) _lc_stream.close() logging.config.dictConfig(_lc_conf) else: print(     "ERROR: logger config file '%s' not exsits or not readable\n" %     LOGGING_CONF) sys.exit(1)

В этом примере кода мы проверяем, существует ли файл logging.yaml и доступен ли он для чтения. Затем мы открываем файл, загружаем его содержимое в переменную _lc_conf с помощью модуля yaml, и закрываем файл. Далее мы используем метод dictConfig из модуля logging.config, чтобы применить загруженную конфигурацию к логгерам в проекте.

Если файл logging.yaml не существует или не доступен для чтения, выводится сообщение об ошибке.

Получение и обработка данных

Для простого примера в качестве базы данных будем использовать список событий, который содержит элементы:

event_list = [ {     'name': 'Music Show',     'date': '2023:12:12',     'time': '17:00',     'mentor': 'Jazz Band',     'guests': [         {'name': 'Ivan Gubov', 'phone': '79007771101'},.....         {'name': 'Mansur Berdiev', 'phone': '79800002001'}] },..........., { 'name': 'Music Show', 'date': '2023:11:14',     'time': '20:00',     'mentor': 'Jazz Band',     'guests': [{'name': 'Olga Lomova', 'phone': '79055551101'}]}]

По ключу guests получим список гостей с номерами телефонов, которые зарегистрировались на событие. Данные разместим в файле info_db.py.

Далее перейдем в файл handle_data.py инапишем простую функцию, возвращающую список событий, которые приходятся на текущую дату.

import datetime from example_db.info_db import event_list  def get_event_today(event_list): current_date = datetime.date.today().strftime("%Y:%m:%d") result_list = [] for event in event_list: if event['date'] == current_date: result_list.append(event) return result_list

В реальном проекте, когда нужны события за текущую дату, возможно придётся составить более сложные запросы к базе данных или API. Например, может потребоваться выполнить запрос к базе данных для получения списка событий с определёнными условиями, включая текущую дату. Однако, для нашего простого примера, функция get_event_today подходит, чтобы показать основной механизм выборки событий.

Ограничение числа запросов в единицу времени и контроль ошибок

Для того, чтобы контролировать число запросов в единицу времени и обработки ошибок, связанных с перегрузкой сервера (например, ошибку 500), создадим функцию декоратор rate_limit.

Декоратор rate_limit имеет два параметра: limit и interval. Параметр limit определяет максимальное количество запросов, которое можно выполнить в указанную единицу времени interval. Внутри декоратора определена функция wrapper. Это обёртка для оригинальной асинхронной функции. Внутри wrapper происходит контроль скорости выполнения запросов и обработка ошибок.

import asyncio import logging import functools import httpx from datetime import datetime, timedelta  logger = logging.getLogger(__name__)  def rate_limit(limit: int, interval: int): def decorator(func):     last_call = datetime.min     retries = 0      @functools.wraps(func)     async def wrapper(self, *args, **kwargs):         nonlocal last_call, retries         elapsed = datetime.now() - last_call         if elapsed < timedelta(seconds=interval):             await asyncio.sleep((timedelta(seconds=interval) - elapsed).total_seconds())         last_call = datetime.now()          try:              return await func(self, *args, **kwargs)         except httpx.HTTPError as http_err:             if http_err.response.status_code == 500 and retries <= 5:                 retries += 1                 await asyncio.sleep(3)                 logger.info(f"HTTP ошибка:{func.__name__} \n Дополнительная попытка запроса: {retries}")                 return await wrapper(self, *args, **kwargs)             else: logger.error(f":{http_err}. \n Код ошибки: {http_err.response.status_code}")                 raise         except Exception as e:             logger.error(f"Программная ошибка:{e} \n В функции: {func.__name__}")             raise     return wrapper return decorator

Если при выполнении запроса возникает ошибка типа httpx.HTTPError, wrapper проверяет код ошибки. Если код ошибки равен 500 и количество попыток повторного выполнения запроса меньше или равно 5, то wrapper делает паузу в 3 секунды с помощью await asyncio.sleep(3) и повторно вызывает себя для выполнения запроса ещё раз:

return await wrapper(self, *args, **kwargs)

Если код ошибки не равен 500 или количество попыток повторного выполнения запроса превышает 5, то wrapper генерирует исключение, которое будет обработано в вызывающем коде.

Ограничение числа запросов в единицу времени помогает снизить нагрузку на сервер и повысить производительность приложения. Обработка 500 ошибки и повторное выполнение запроса позволяет улучшить стабильность работы приложения, особенно в случаях, когда сервер временно недоступен или перегружен.

Использование асинхронного декоратора позволяет эффективно управлять асинхронными запросами и обеспечивает отзывчивость приложения даже при выполнении большого количества запросов.

Классы клиентов для работы с МТС Exolve

В представленном ниже коде модуля mtt_client.py есть два класса: MTSClient и SendingSMS. Оба класса предназначены для работы с API МТС Exolve и обеспечивают функциональность отправки сообщений. 

import logging import typing  import httpx import asyncio  from config import API_KEY, PHONE_SEND, BASE_URL from helper.decorators import rate_limit  logger = logging.getLogger(__name__)  class MTSClient: """Клиент умеющий делать запрос к API MTC."""  def __init__(         self,     base_url: str,     token: str,     aclient: httpx.AsyncClient or None = None ):     """     :param base_url: API URL     :param token: Токен     :param aclient: Асинхронный клиент     """     self.base_url = base_url     self.token = token       self.aclient = aclient or httpx.AsyncClient()     self.headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.token}"}  @rate_limit(limit=1, interval=1) async def send_message(self, body) -> dict:     """     Отправить sms c напоминанием о событии.      :param body: Тело запроса     :return: Тело ответа в формате json с message_id     """     url = f"{self.base_url}"     response = await self.aclient.post(url=url, json=body, headers=self.headers, timeout=5)     response.raise_for_status()     decoded_response = response.json()     if response.status_code == 200:         logger.info(f"id сообщения: {decoded_response}")         logger.info(f"Напоминание о событии успешно отправлено пользователю с номером: {body['destination']}")     return decoded_response   class SendingSMS(): def __init__(self, client: MTSClient):     self.client = client  async def send_all_guests(self, event_today: list) -> bool:     """     Рассылает напоминание всем гостям по списку. :param event_today: Cписок событий на сегодня     :return: Логическое значение в случае успеха     """      for event in event_today:         guests_data = event["guests"]         for element in guests_data:             sms_data = {                 "number": PHONE_SEND,                 "destination": element["phone"],                 "text": f"Ждем Вас сегодня в {event['time']} на событие {event['name']}"}              message_info = await self.client.send_message(body=sms_data)      return True

Класс MTSClient

Класс MTSClient — клиент для работы с API МТС Exolve. Он содержит методы и атрибуты для отправки запросов и обработки ответов от сервера.

В конструкторе класса MTSClient инициализируются следующие атрибуты:

  • base_url: URL API MTS Exolve.

  • token: токен для аутентификации при отправке запросов.

  • aclient: асинхронный клиент httpx.AsyncClient или None (по умолчанию).

Класс MTSClient также содержит метод send_message, который асинхронно отправляет SMS-сообщение с напоминанием о событии. Метод принимает тело запроса body и возвращает словарь с информацией о сообщении. На данный метод для контроля ошибок и частоты запросов мы помеcтили упомянутый ранее декоратор rate_limit.

Назначение этого класса инкапсулировать логику отправки запросов и обработки ответов, что позволяет легко использовать его в различных частях приложения. В случае изменения API МТС Exolve, достаточно внести изменения только внутри класса MTSClient, не затрагивая другие части кода.

Класс SendingSMS

Класс SendingSMS представляет собой обертку над клиентом MTSClient.

В конструкторе класса SendingSMS инициализируется атрибут client, это экземпляр класса MTSClient. Он позволяет использовать функциональность MTSClient для отправки сообщений.

Класс SendingSMS содержит метод send_all_guests, который асинхронно отправляет напоминание о событии всем гостям из списка. Метод принимает список событий event_today и возвращает True, если все сообщения успешно отправлены.

Этот класс предоставляет удобный интерфейс для отправки SMS-сообщений гостям событий. Он скрывает детали взаимодействия с API МТС Exolve и позволяет сосредоточиться на бизнес-логике приложения.

Функция amain

Создадим функцию amain, которая будет асинхронной точкой входа в приложение.

async def amain(event_today): async with httpx.AsyncClient() as aclient:     mtt_client = MTSClient( base_url=BASE_URL,         token=API_KEY,         aclient=aclient)     sms_sender = SendingSMS(client=mtt_client)     tasks = [sms_sender.send_all_guests(event_today)]     result_work_send = await asyncio.gather(*tasks)

Внутри функции создается экземпляр класса MTSClient и класса SendingSMS с передачей асинхронного клиента httpx.AsyncClient. Затем создаются асинхронные задания для отправки сообщений гостям событий.

Функцию amain можно вызывать в главном файле приложения с помощью asyncio.run, что позволяет асинхронно выполнить задачи отправки сообщений.

В модуле mtt_client.py представленные классы и функция обеспечивают удобство, гибкость и расширяемость при работе с API МТС Exolve для отправки SMS-сообщений.

Основной файл приложения

Сформируем главный файл приложения main.py

import sys import logging import asyncio import apimodul.mtt_client as mtt from example_db import handle_data, info_db  logger = logging.getLogger(__name__)  if __name__ == "__main__": while True:     try:         event_today = handle_data.get_event_today(info_db.event_list)         if event_today:             asyncio.run(mtt.amain(event_today))         break     except AssertionError as e:         logger.error(e, exc_info=True)         logger.error("Произошла ошибка при работе с данными.")     except KeyboardInterrupt:         logger.error(f" Произошло прерывание программы с клавиатуры.")         sys.exit(1)     except Exception as e:         logger.error(f" Ошибка выполнения программы: {e}")         sys.exit(1) sys.exit(0)

Внутри цикла while True происходит вызов функции get_event_today из модуля handle_data, которая получает список событий, запланированных на текущий день. Если в списке есть события, вызывается асинхронная функция amain из модуля mtt_client.py, которая обрабатывает эти события.

Такая структура главного файла проекта обеспечивает контроль выполнения программы и обработку возможных ошибок. Цикл while True позволяет повторять выполнение программы до тех пор, пока не будет выполнено условие выхода из цикла.

В блоке try-except происходит обработка исключений, которые могут возникнуть во время выполнения кода. Различные типы исключений обрабатываются по-разному:

  • AssertionError: Если возникает утверждение (assertion) внутри функции get_event_today, то генерируется исключение AssertionError. В этом случае, в лог-файл записывается ошибка и информация о произошедшем исключении.

  • KeyboardInterrupt: Если пользователь прерывает выполнение программы с клавиатуры (нажимает Ctrl+C), то генерируется исключение KeyboardInterrupt. В этом случае, в лог-файл записывается информация о прерывании программы.

  • Exception: Если возникает любое другое исключение, не попадающее в предыдущие категории, то генерируется исключение Exception. В этом случае, в лог-файл записывается информация об ошибке выполнения программы.

После обработки исключений в случае успешного выполнения, программа завершается с помощью вызова sys.exit(0) или sys.exit(1), если возникли ошибки, где: 

  • 0- сигнал в операционную систему об успешном завершении программы, его можно учитывать для последующего запуска других приложений. 

  • 1- сигнал в операционную систему об ошибке.

Запуск по расписанию

Наша программа рассылки готова. Теперь необходимо подумать об автоматическом запуске в определённое время. Для этого используем планировщик заданий Cron. Откроем терминал Linux и введём команду:

crontab -e

У нас откроется файл с интервалами запуска. Время и дата внутри файла задаётся особым синтаксисом, рассмотрим настройку одной задачи Cron:

 **   * *   *  cd /home/projects/program [минута][час][день][месяц][день недели][команда(ы)]

В нашем случае запустим скрипт один раз в день в 11:15 утра. Тогда итоговая запись в cron-файле будет выглядеть так:

15 11 * * * cd PycharmProjects/aclient_exemp && venv/bin/python3.8 main.py

То есть, сначала задаём время, переходим в папку с проектом и через виртуальное окружение запускаем скрипт. Так как проект находится в папке PycharmProjects, и запускаю я планировщик не из под супер-пользователя, а из под обычного, здесь нет необходимости записывать полный путь до папки с проектом, потому что пользователь находится по умолчанию в папке /home.

Теперь скрипт каждый день ровно в 11:30 будет отправлять SMS гостям с напоминанием о событии.

Более подробно о планировщике можно прочитать в этой статье. Стоит отметить, что среди плюсов планировщика Cron простота, надёжность и поддержка работы на большинстве систем Linux и Unix. Это означает, что вы без проблем можете использовать его на различных платформах.

Заключение

Этот проект — пример того, как можно использовать httpx и планировщик Cron для создания системы SMS-оповещений. Клиент может послужить отправной точкой для разработки более сложных приложений для напоминаний о мероприятии. С помощью этого простого примера удалось показать, что Cron предоставляет простой и понятный способ планирования задач. Мы можем легко настроить их выполнение по определённому расписанию, указав время и дату их выполнения.


ссылка на оригинал статьи https://habr.com/ru/articles/834412/