В этой статье рассмотрим создание асинхронного приложения на Python с использованием библиотеки httpx для рассылки уведомлений пользователям о предстоящих событиях, на которые они зарегистрировались.
Приложение будет запускаться раз в сутки с помощью планировщика Cron на Linux сервере. Для отправки SMS-уведомлений воспользуемся платформой МТС Exolve.
Зачем это нужно
Рассылка уведомлений пользователям — важная задача во многих сервисах. Она позволяет информировать пользователей о важных обновлениях, новостях или других интересных им событиях. При этом SMS — это инструмент с гарантированной доставкой и высокой открываемостью, сообщения дойдут даже до пользователей с отключенным интернетом.
Асинхронное приложение на Python с использованием библиотеки httpx позволяет эффективно реализовать такую рассылку, обеспечивая высокую производительность и отзывчивость системы.
Почему асинхронно
В нашем случае, использование асинхронного подхода позволяет приложению эффективно обрабатывать большое количество запросов на рассылку уведомлений, минимизируя задержки и обеспечивая отзывчивость системы.
Пример асинхронной реализации клиента на httpx
Рассмотрим пример реализации отправки SMS-уведомлений через приложение на языке python. Для начала, убедитесь, что у вас установлены все необходимые зависимости. Мы используем httpx для создания приложения и сопутствующую библиотеку asyncio.
Проект будет со следующей структурой:
aclient_exmp/ /venv /example_db __init__.py handle_data.py info_db.py /apimodul __init__.py mtt_client.py /helper __init__.py decorators.py logging.yaml config.py main.py dev.env
В файле dev.env хранятся переменные окружения: API-ключ, URL-адрес запроса к МТС Exolve и номер телефона для отправки SMS.
В файле config.py получим данные из переменных окружения.
from dotenv import dotenv_values info_env = dotenv_values('dev.env') API_KEY = info_env.get('API_KEY') PHONE_SEND = info_env.get('PHONE_SEND') BASE_URL = info_env.get('BASE_URL')
Логирование
Добавим логирование к нашему проекту. Файл logging.yaml представляет собой файл конфигурации, в котором определяются форматирование, обработчики и уровни логирования для различных логгеров в проекте. В данном примере файла logging.yaml мы определяем форматирование для логов, создаём обработчик для вывода логов в консоль и настраиваем корневой логгер для уровня DEBUG.
version: 1 formatters: info: format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s' handlers: console_handler: class: logging.StreamHandler level: INFO formatter: info stream: ext://sys.stdout root: level: DEBUG handlers: [console_handler]
Дополним код в файле config.py, который выполнит загрузку конфигурации логирования из файла logging.yaml и настройку логгеров в проекте.
import logging.config import os import sys import yaml from dotenv import dotenv_values info_env = dotenv_values('dev.env') API_KEY = info_env.get('API_KEY') PHONE_SEND = info_env.get('PHONE_SEND') BASE_URL=f"{info_env.get('BASE_URL')}" CURRENT_FILE_PATH = os.path.abspath(__file__) BASE_DIR = os.path.dirname(CURRENT_FILE_PATH) LOGGING_CONF = BASE_DIR + '/logging.yaml' if os.path.isfile(LOGGING_CONF) and os.access(LOGGING_CONF, os.R_OK): _lc_stream = open(LOGGING_CONF, 'r') _lc_conf = yaml.load(_lc_stream, Loader=yaml.FullLoader) _lc_stream.close() logging.config.dictConfig(_lc_conf) else: print( "ERROR: logger config file '%s' not exsits or not readable\n" % LOGGING_CONF) sys.exit(1)
В этом примере кода мы проверяем, существует ли файл logging.yaml и доступен ли он для чтения. Затем мы открываем файл, загружаем его содержимое в переменную _lc_conf с помощью модуля yaml, и закрываем файл. Далее мы используем метод dictConfig из модуля logging.config, чтобы применить загруженную конфигурацию к логгерам в проекте.
Если файл logging.yaml не существует или не доступен для чтения, выводится сообщение об ошибке.
Получение и обработка данных
Для простого примера в качестве базы данных будем использовать список событий, который содержит элементы:
event_list = [ { 'name': 'Music Show', 'date': '2023:12:12', 'time': '17:00', 'mentor': 'Jazz Band', 'guests': [ {'name': 'Ivan Gubov', 'phone': '79007771101'},..... {'name': 'Mansur Berdiev', 'phone': '79800002001'}] },..........., { 'name': 'Music Show', 'date': '2023:11:14', 'time': '20:00', 'mentor': 'Jazz Band', 'guests': [{'name': 'Olga Lomova', 'phone': '79055551101'}]}]
По ключу guests получим список гостей с номерами телефонов, которые зарегистрировались на событие. Данные разместим в файле info_db.py.
Далее перейдем в файл handle_data.py инапишем простую функцию, возвращающую список событий, которые приходятся на текущую дату.
import datetime from example_db.info_db import event_list def get_event_today(event_list): current_date = datetime.date.today().strftime("%Y:%m:%d") result_list = [] for event in event_list: if event['date'] == current_date: result_list.append(event) return result_list
В реальном проекте, когда нужны события за текущую дату, возможно придётся составить более сложные запросы к базе данных или API. Например, может потребоваться выполнить запрос к базе данных для получения списка событий с определёнными условиями, включая текущую дату. Однако, для нашего простого примера, функция get_event_today подходит, чтобы показать основной механизм выборки событий.
Ограничение числа запросов в единицу времени и контроль ошибок
Для того, чтобы контролировать число запросов в единицу времени и обработки ошибок, связанных с перегрузкой сервера (например, ошибку 500), создадим функцию декоратор rate_limit.
Декоратор rate_limit имеет два параметра: limit и interval. Параметр limit определяет максимальное количество запросов, которое можно выполнить в указанную единицу времени interval. Внутри декоратора определена функция wrapper. Это обёртка для оригинальной асинхронной функции. Внутри wrapper происходит контроль скорости выполнения запросов и обработка ошибок.
import asyncio import logging import functools import httpx from datetime import datetime, timedelta logger = logging.getLogger(__name__) def rate_limit(limit: int, interval: int): def decorator(func): last_call = datetime.min retries = 0 @functools.wraps(func) async def wrapper(self, *args, **kwargs): nonlocal last_call, retries elapsed = datetime.now() - last_call if elapsed < timedelta(seconds=interval): await asyncio.sleep((timedelta(seconds=interval) - elapsed).total_seconds()) last_call = datetime.now() try: return await func(self, *args, **kwargs) except httpx.HTTPError as http_err: if http_err.response.status_code == 500 and retries <= 5: retries += 1 await asyncio.sleep(3) logger.info(f"HTTP ошибка:{func.__name__} \n Дополнительная попытка запроса: {retries}") return await wrapper(self, *args, **kwargs) else: logger.error(f":{http_err}. \n Код ошибки: {http_err.response.status_code}") raise except Exception as e: logger.error(f"Программная ошибка:{e} \n В функции: {func.__name__}") raise return wrapper return decorator
Если при выполнении запроса возникает ошибка типа httpx.HTTPError, wrapper проверяет код ошибки. Если код ошибки равен 500 и количество попыток повторного выполнения запроса меньше или равно 5, то wrapper делает паузу в 3 секунды с помощью await asyncio.sleep(3) и повторно вызывает себя для выполнения запроса ещё раз:
return await wrapper(self, *args, **kwargs)
Если код ошибки не равен 500 или количество попыток повторного выполнения запроса превышает 5, то wrapper генерирует исключение, которое будет обработано в вызывающем коде.
Ограничение числа запросов в единицу времени помогает снизить нагрузку на сервер и повысить производительность приложения. Обработка 500 ошибки и повторное выполнение запроса позволяет улучшить стабильность работы приложения, особенно в случаях, когда сервер временно недоступен или перегружен.
Использование асинхронного декоратора позволяет эффективно управлять асинхронными запросами и обеспечивает отзывчивость приложения даже при выполнении большого количества запросов.
Классы клиентов для работы с МТС Exolve
В представленном ниже коде модуля mtt_client.py есть два класса: MTSClient и SendingSMS. Оба класса предназначены для работы с API МТС Exolve и обеспечивают функциональность отправки сообщений.
import logging import typing import httpx import asyncio from config import API_KEY, PHONE_SEND, BASE_URL from helper.decorators import rate_limit logger = logging.getLogger(__name__) class MTSClient: """Клиент умеющий делать запрос к API MTC.""" def __init__( self, base_url: str, token: str, aclient: httpx.AsyncClient or None = None ): """ :param base_url: API URL :param token: Токен :param aclient: Асинхронный клиент """ self.base_url = base_url self.token = token self.aclient = aclient or httpx.AsyncClient() self.headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.token}"} @rate_limit(limit=1, interval=1) async def send_message(self, body) -> dict: """ Отправить sms c напоминанием о событии. :param body: Тело запроса :return: Тело ответа в формате json с message_id """ url = f"{self.base_url}" response = await self.aclient.post(url=url, json=body, headers=self.headers, timeout=5) response.raise_for_status() decoded_response = response.json() if response.status_code == 200: logger.info(f"id сообщения: {decoded_response}") logger.info(f"Напоминание о событии успешно отправлено пользователю с номером: {body['destination']}") return decoded_response class SendingSMS(): def __init__(self, client: MTSClient): self.client = client async def send_all_guests(self, event_today: list) -> bool: """ Рассылает напоминание всем гостям по списку. :param event_today: Cписок событий на сегодня :return: Логическое значение в случае успеха """ for event in event_today: guests_data = event["guests"] for element in guests_data: sms_data = { "number": PHONE_SEND, "destination": element["phone"], "text": f"Ждем Вас сегодня в {event['time']} на событие {event['name']}"} message_info = await self.client.send_message(body=sms_data) return True
Класс MTSClient
Класс MTSClient — клиент для работы с API МТС Exolve. Он содержит методы и атрибуты для отправки запросов и обработки ответов от сервера.
В конструкторе класса MTSClient инициализируются следующие атрибуты:
-
base_url: URL API MTS Exolve.
-
token: токен для аутентификации при отправке запросов.
-
aclient: асинхронный клиент httpx.AsyncClient или None (по умолчанию).
Класс MTSClient также содержит метод send_message, который асинхронно отправляет SMS-сообщение с напоминанием о событии. Метод принимает тело запроса body и возвращает словарь с информацией о сообщении. На данный метод для контроля ошибок и частоты запросов мы помеcтили упомянутый ранее декоратор rate_limit.
Назначение этого класса инкапсулировать логику отправки запросов и обработки ответов, что позволяет легко использовать его в различных частях приложения. В случае изменения API МТС Exolve, достаточно внести изменения только внутри класса MTSClient, не затрагивая другие части кода.
Класс SendingSMS
Класс SendingSMS представляет собой обертку над клиентом MTSClient.
В конструкторе класса SendingSMS инициализируется атрибут client, это экземпляр класса MTSClient. Он позволяет использовать функциональность MTSClient для отправки сообщений.
Класс SendingSMS содержит метод send_all_guests, который асинхронно отправляет напоминание о событии всем гостям из списка. Метод принимает список событий event_today и возвращает True, если все сообщения успешно отправлены.
Этот класс предоставляет удобный интерфейс для отправки SMS-сообщений гостям событий. Он скрывает детали взаимодействия с API МТС Exolve и позволяет сосредоточиться на бизнес-логике приложения.
Функция amain
Создадим функцию amain, которая будет асинхронной точкой входа в приложение.
async def amain(event_today): async with httpx.AsyncClient() as aclient: mtt_client = MTSClient( base_url=BASE_URL, token=API_KEY, aclient=aclient) sms_sender = SendingSMS(client=mtt_client) tasks = [sms_sender.send_all_guests(event_today)] result_work_send = await asyncio.gather(*tasks)
Внутри функции создается экземпляр класса MTSClient и класса SendingSMS с передачей асинхронного клиента httpx.AsyncClient. Затем создаются асинхронные задания для отправки сообщений гостям событий.
Функцию amain можно вызывать в главном файле приложения с помощью asyncio.run, что позволяет асинхронно выполнить задачи отправки сообщений.
В модуле mtt_client.py представленные классы и функция обеспечивают удобство, гибкость и расширяемость при работе с API МТС Exolve для отправки SMS-сообщений.
Основной файл приложения
Сформируем главный файл приложения main.py
import sys import logging import asyncio import apimodul.mtt_client as mtt from example_db import handle_data, info_db logger = logging.getLogger(__name__) if __name__ == "__main__": while True: try: event_today = handle_data.get_event_today(info_db.event_list) if event_today: asyncio.run(mtt.amain(event_today)) break except AssertionError as e: logger.error(e, exc_info=True) logger.error("Произошла ошибка при работе с данными.") except KeyboardInterrupt: logger.error(f" Произошло прерывание программы с клавиатуры.") sys.exit(1) except Exception as e: logger.error(f" Ошибка выполнения программы: {e}") sys.exit(1) sys.exit(0)
Внутри цикла while True происходит вызов функции get_event_today из модуля handle_data, которая получает список событий, запланированных на текущий день. Если в списке есть события, вызывается асинхронная функция amain из модуля mtt_client.py, которая обрабатывает эти события.
Такая структура главного файла проекта обеспечивает контроль выполнения программы и обработку возможных ошибок. Цикл while True позволяет повторять выполнение программы до тех пор, пока не будет выполнено условие выхода из цикла.
В блоке try-except происходит обработка исключений, которые могут возникнуть во время выполнения кода. Различные типы исключений обрабатываются по-разному:
-
AssertionError: Если возникает утверждение (assertion) внутри функции get_event_today, то генерируется исключение AssertionError. В этом случае, в лог-файл записывается ошибка и информация о произошедшем исключении.
-
KeyboardInterrupt: Если пользователь прерывает выполнение программы с клавиатуры (нажимает Ctrl+C), то генерируется исключение KeyboardInterrupt. В этом случае, в лог-файл записывается информация о прерывании программы.
-
Exception: Если возникает любое другое исключение, не попадающее в предыдущие категории, то генерируется исключение Exception. В этом случае, в лог-файл записывается информация об ошибке выполнения программы.
После обработки исключений в случае успешного выполнения, программа завершается с помощью вызова sys.exit(0) или sys.exit(1), если возникли ошибки, где:
-
0- сигнал в операционную систему об успешном завершении программы, его можно учитывать для последующего запуска других приложений.
-
1- сигнал в операционную систему об ошибке.
Запуск по расписанию
Наша программа рассылки готова. Теперь необходимо подумать об автоматическом запуске в определённое время. Для этого используем планировщик заданий Cron. Откроем терминал Linux и введём команду:
crontab -e
У нас откроется файл с интервалами запуска. Время и дата внутри файла задаётся особым синтаксисом, рассмотрим настройку одной задачи Cron:
** * * * cd /home/projects/program [минута][час][день][месяц][день недели][команда(ы)]
В нашем случае запустим скрипт один раз в день в 11:15 утра. Тогда итоговая запись в cron-файле будет выглядеть так:
15 11 * * * cd PycharmProjects/aclient_exemp && venv/bin/python3.8 main.py
То есть, сначала задаём время, переходим в папку с проектом и через виртуальное окружение запускаем скрипт. Так как проект находится в папке PycharmProjects, и запускаю я планировщик не из под супер-пользователя, а из под обычного, здесь нет необходимости записывать полный путь до папки с проектом, потому что пользователь находится по умолчанию в папке /home.
Теперь скрипт каждый день ровно в 11:30 будет отправлять SMS гостям с напоминанием о событии.
Более подробно о планировщике можно прочитать в этой статье. Стоит отметить, что среди плюсов планировщика Cron простота, надёжность и поддержка работы на большинстве систем Linux и Unix. Это означает, что вы без проблем можете использовать его на различных платформах.
Заключение
Этот проект — пример того, как можно использовать httpx и планировщик Cron для создания системы SMS-оповещений. Клиент может послужить отправной точкой для разработки более сложных приложений для напоминаний о мероприятии. С помощью этого простого примера удалось показать, что Cron предоставляет простой и понятный способ планирования задач. Мы можем легко настроить их выполнение по определённому расписанию, указав время и дату их выполнения.
ссылка на оригинал статьи https://habr.com/ru/articles/834412/
Добавить комментарий