Несмотря на то, что я системный аналитик, проклятие, которое начинается с фразы «тыж программист…», иногда преследует и меня. Недавно у меня спросили совета, как сделать так, чтобы для нужных людей шлагбаум удаленно открывался без транспондера и сложных систем.
К сожалению, сходу я не придумал ответа на вопрос, но задачка показалась мне любопытной, и я немного изучил вопрос.
Решение нашёл быстро — в качестве аппаратной части можно использовать GSM-реле, а контроль доступа реализовать с помощью микро сервиса для переадресации вызова.
И поскольку у меня наличествовал тестовый доступ к МТС Exolve, грех было им не воспользоваться. Тем более, что соответствующее API для управления входящим вызовом есть.
Оглавление
Если честно, я не знаток автоматических шлагбаумов, поэтому в статье я не буду привязываться к аппаратной части, мне кажется подойдёт практически любое устройство, которое предложит поисковик по запросу «GSM модуль для шлагбаума». Как правило у многих моделей шлагбаумов есть контакт, который при замыкании (размыкании реле) будет поднимать и опускать шлагбаум.
Как видите решение очень простое, в принципе можно и не городить никаких сервисов, а просто раздать знакомым номер телефона SIM-карты в GSM-модуле. Но что делать, если этот номер станет известен третьим лицам? Нам явно нужен белый список разрешенных номеров. По хорошему такие проблемы решают комплексные системы безопасности. Но мне очень хотелось покрутить в руках FastAPI. Поэтому мы сделаем свой сервис по контролю звонящих с «картошными играми и прочими увеселениями».
Однако, прежде чем приступать к делу – традиционный дисклеймер. Я не программист. Задачу я решал скорее концептуально и корнер-кейсы особо не продумывал. Поэтому, настоятельно не рекомендую относиться к материалам статьи, как к истине в последней инстанции и использовать код без изменений в продуктовой среде.
Что планируем сделать
Формальности улажены и можно переходить к делу.
Рассмотрим следующий сценарий:
Предусловия.
-
Есть несколько (минимум две) зоны, защищенные шлагбаумами, в каждом стоит свой GSM-модуль.
-
Пользователям может быть разрешен доступ к одной или сразу нескольким зонам.
Мы будем прорабатывать следующий:
-
Пользователь подъезжает к шлагбауму.
-
Набирает номер.
-
MTС Exolve делает запрос к нам в сервис
-
Сервис проверяет в какие зоны разрешен проезд пользователю и делает обзвон соответствующих шлагбаумов.
-
Шлагбаумы открываются.

База данных
Начнем с базы данных. в которой будем хранить доступы пользователей из белого списка.
Для прототипа я взял SQlite ибо её функций нам хватит с головой, а ее поддержка уже давно реализована «из коробки» в Python 3.
Фактически нам нужно две таблицы:
-
Таблица для данных о пользователях, которые будут звонить на контрольный номер (users).
-
Таблица для данных о номерах телефонах GSM-модулей в шлагбаумах и защищаемых ими зонах (barriers).
Однако, для реализации связи многие-ко-многим, мы сделаем третью табличку в которой непосредственно соединим пользователей и доступные им зоны для проезда (users_access).
В результате получим такую структуру.


Поскольку все исходники я разместил на GitHub, воздержусь от подробного описания структуры таблиц..
Отмечу только, что в user_access составной первичный ключ из обоих полей, при этом оба поля это внешние ключи в соответствующих таблицах.
В качестве тестового примера мы создадим два шлагбаума.
Один для обычных гостей (#1), а второй (№2) для сотрудников.

Все имена и номера телефонов выдуманы, любые совпадения случайны.
Создадим двух пользователей.
Одного гостя (Habr) и одного сотрудника (Habra).

Осталось задать доступы. Пользователь Habr сможет проехать только через гостевой шлагбаум, а Habra может проехать через оба.

Настройка УВВ API МТС Exolve
Прежде чем приступить к разработке сервиса, нам необходимо настроить управление входящим вызовов в МТС Exolve. Я надеюсь, что у вас есть к нему доступ.
Согласно документации, необходимо с помощью метода setSipCallControlU
установить URL, к которому Exolve будет обращаться за дальнейшими инструкциями по обработке вызова.
Для этого необходимо отправить POST запрос на адрес:
https://api.mtt.ru/ipcr/ (указав в базовой авторизации логин и пароль).
Тело запроса:
{ "id":"1", "jsonrpc":"2.0", "method": "setSipCallControlURL", "params": { "sip_id": "Ваш SIP ID (ном", "url": "Ваш URL" } }
В нашем случае sip_id
– это номер телефона, на который будут звонить пользователи. А url
– это адрес метода, который скажет, что делать дальше с вызовом вида <ваш домен или IP>/access.
Дополнительно еще можно установить резервный номер, на случай если сервис будет недоступен, например, номер поста охраны.
Сервис для управления входящим вызовом и логирование запроса
Итак, пришло время браться за разработку сервиса. Я использовал Python v. 3.10 и fastapi v. 0.115, но думаю, что подойдут и другие версии.
Освежим в памяти, схему взаимодействия, рассмотрим её в формате диаграммы последовательности.

Создадим файловую структуру:
├── api
│ ├── access.py (открытие шлагбаума)
│ ├── config.py (общие константы и настройки)
│ ├── crud.py (методы для управления записями в БД)
│ ├── __init__.py (служебный файл для подключения модулей)
│ └── log.py (работа с записями в лог)
├── data.sqlite (база данных)
├── main.py (главный файл скрипта)
└── readme.md
└──app.log (появится после первого логируемого запроса)
Начнем с файла api/config.py
DB_NAME = 'data.sqlite' ACCESS_TABLE = 'users_access' BARIERS_TABLE = 'barriers' USER_TABLE = 'users' LOGGING = { 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'simple': { 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s' }, }, 'handlers': { 'file': { 'class': 'logging.FileHandler', 'filename': 'app.log', 'formatter': 'simple' }, }, 'loggers': { 'myapp': { 'handlers': ['file'], 'level': 'INFO', 'propagate': False }, } } all = ["DB_NAME", "ACCESS_TABLE", "BARIERS_TABLE", "USER_TABLE", "LOGGING"]
В нем мы храним константы, которые будем использовать в других скриптах, а также создаем типовую настройку логов для библиотеки logging.
Не отходя от кассы, перейдем к api/logging.py
from fastapi import APIRouter, Body from typing import Any from . import config import logging.config logging.config.dictConfig(config.LOGGING) logger = logging.getLogger('myapp') router = APIRouter(tags=["Logs"]) @router.post('/logs') async def get_body(body: Any = Body(None)): """ Create log for MTC Exolve events. """ logger.info(f'log endpoint was called. body: {body}') return body
В данном скрипте импортируем необходимые библиотеки, создаем экземпляр логгера, который будем использовать в других методах.
А также создаем группу дочерних API (router), в которой будет всего 1 POST метод
Данный метод при вызове просто пишет информационное сообщение с телом запроса.
Он пригодится нам либо для тестирования логов, либо для сохранения сведений о событии при запросе от МТС Exolve.
Пришло время основной бизнес-логики, ради которой все и затеял – скрипту api/access.py
Полный листинг скрипта спрячу под спойлером:
Скрытый текст
from fastapi import APIRouter, HTTPException, Body from pydantic import BaseModel from typing import List, Union, Any import sqlite3 from .config import * from .log import logger EVENT_URL = "<url or IP>/logs" CLIENT_ID = "MTC Exolve client ID" DISPLAY_NUMBER = "MTC Exolve phone number" class FollowMeRule(BaseModel): I_FOLLOW_ORDER: str ACTIVE: str NAME: str REDIRECT_NUMBER: str PERIOD: str PERIOD_DESCRIPTION: str TIMEOUT: str class FollowMeStruct(BaseModel): List[FollowMeRule] class Result(BaseModel): redirect_type: int event_URL: str client_id: str event_extended: str masking: str display_number: str followme_struct: List[Union[int, List[FollowMeRule]]] class ExolveResponse(BaseModel): id: int jsonrpc: str result: Result def open_barriers(phone): connection = sqlite3.connect(DB_NAME) cursor = connection.cursor() cursor.execute(f'''SELECT phone from {BARIERS_TABLE} as b JOIN {ACCESS_TABLE} as u on b.zone = u.barrier_zone where u.user_phone = {phone} ''') column_names = [col[0] for col in cursor.description] rows = cursor.fetchall() response = [dict(zip(column_names, row)) for row in rows] connection.close() return response router = APIRouter(tags=["Access control"]) @router.post("/access/", response_model=ExolveResponse) async def response_to_exolve(body: Any = Body(None)): """ Get response to MTC Exolve for redirect phone call to barrier GSM \n (s.a [documentation](https://wiki.exolve.ru/pages/viewpage.action?pageId=106332539)) """ if 'params' in body and 'numberA' in body['params']: user_phone = body["params"]["numberA"] else: raise HTTPException(status_code=400, detail="Bad request.Filed params.numberA required") barrier_phones = open_barriers(user_phone) if (barrier_phones == None): raise HTTPException(status_code=403, detail="Access to barriers not allowed") followme_struct = [] for i in range(0, len(barrier_phones) ) : row = barrier_phones[i] followme_struct.append( { "I_FOLLOW_ORDER": str(i+1), "ACTIVE": "Y", "NAME": "BARRIER_PHONE", "REDIRECT_NUMBER": str(row["phone"]), "PERIOD": "always", "PERIOD_DESCRIPTION": "always", "TIMEOUT": "30" }) # Создаем объект Result result_object = Result( redirect_type="3", event_URL=EVENT_URL , client_id= CLIENT_ID, event_extended="N", masking= "Y", display_number= DISPLAY_NUMBER, followme_struct=[len(followme_struct),followme_struct] ) exolve_response_object = ExolveResponse( id=1, jsonrpc="2.0", result=result_object ) logger.info(f'barriers {barrier_phones} try to open for {user_phone}') return exolve_response_object
В самом начале мы импортируем необходимые модули и настраиваем константы:
from fastapi import APIRouter, HTTPException, Body from pydantic import BaseModel from typing import List, Union, Any import sqlite3 from .config import * from .log import logger EVENT_URL = "<url or IP>/logs" CLIENT_ID = "MTC Exolve client ID" DISPLAY_NUMBER = "MTC Exolve phone number"
Дальше идет определение классов данных для метода. По сути мы как конструктор собираем из нескольких блоков структуру ожидаемого ответа, указанную в документации.
class FollowMeRule(BaseModel): I_FOLLOW_ORDER: str ACTIVE: str NAME: str REDIRECT_NUMBER: str PERIOD: str PERIOD_DESCRIPTION: str TIMEOUT: str class FollowMeStruct(BaseModel): List[FollowMeRule] class Result(BaseModel): redirect_type: int event_URL: str client_id: str event_extended: str masking: str display_number: str followme_struct: List[Union[int, List[FollowMeRule]]] class ExolveResponse(BaseModel): id: int jsonrpc: str result: Result
Дальше определим функцию для поиска доступных к открытию шлагбаумов для пользователя, позвонившего на номер телефона.
Функция open_barriers проверяет по таблице users_access какие зоны безопасности доступны пользователю, склеивает их с номерами телефонов из таблицы barriers, и возвращает их в виде списка.
В принципе функция в скрипте вызывается один раз, и можно не выносить её отдельно, но я решил для удобства чтения сделать отдельно.
def open_barriers(phone): connection = sqlite3.connect(DB_NAME) cursor = connection.cursor() cursor.execute(f'''SELECT phone from {BARIERS_TABLE} as b JOIN {ACCESS_TABLE} as u on b.zone = u.barrier_zone where u.user_phone = {phone} ''') column_names = [col[0] for col in cursor.description] rows = cursor.fetchall() response = [dict(zip(column_names, row)) for row in rows] connection.close() return response
Осталось рассмотреть описание самого метода POST /access, который вызовет MTC Exolve при звонке.
router = APIRouter(tags=["Access control"]) @router.post("/access/", response_model=ExolveResponse) async def response_to_exolve(body: Any = Body(None)): """ Get response to MTC Exolve for redirect phone call to barrier GSM \n (s.a [documentation](https://wiki.exolve.ru/pages/viewpage.action?pageId=106332539)) """ if 'params' in body and 'numberA' in body['params']: user_phone = body["params"]["numberA"] else: raise HTTPException(status_code=400, detail="Bad request.Filed params.numberA required") barrier_phones = open_barriers(user_phone) if (barrier_phones == None): raise HTTPException(status_code=403, detail="Access to barriers not allowed") followme_struct = [] for i in range(0, len(barrier_phones) ) : row = barrier_phones[i] followme_struct.append( { "I_FOLLOW_ORDER": str(i+1), "ACTIVE": "Y", "NAME": "BARRIER_PHONE", "REDIRECT_NUMBER": str(row["phone"]), "PERIOD": "always", "PERIOD_DESCRIPTION": "always", "TIMEOUT": "30" }) # Создаем объект Result result_object = Result( redirect_type="3", event_URL=EVENT_URL , client_id= CLIENT_ID, event_extended="N", masking= "Y", display_number= DISPLAY_NUMBER, followme_struct=[len(followme_struct),followme_struct] ) exolve_response_object = ExolveResponse( id=1, jsonrpc="2.0", result=result_object ) logger.info(f'barriers {barrier_phones} try to open for {user_phone}') return exolve_response_object
В методе проверяется, указан ли в теле запроса номер телефона с которого совершался звонок. По идее он всегда будет указан при нормальной работы, но для удобства тестирования запросов отправляемых вручную, я добавил эту проверку.
Затем мы получаем список телефонов, на которые нужно сделать вызов с помощью функции open_barrier.
И уже на базе полученного списка создаем объект для ответа на запрос.
Я решил не усложнять логику, поскольку тело ответа в нашем случае по большей части одинаковое, мы просто изменяющиеся данные подставим в шаблон.
Пройдемся в цикле по каждой записи, которая вернулась из базы данных, и создадим структуру для каждой записи, изменяя лишь атрибуты I_FOLLOW_ORDER
и REDIRECT_NUMBER
followme_struct.append( { "I_FOLLOW_ORDER": “ПОРЯДКОВЫЙ НОМЕР ТЕЛЕФОНА ДЛЯ ВЫЗОВА”, "ACTIVE": "Y", "NAME": "BARRIER_PHONE", "REDIRECT_NUMBER": “НОМЕР ТЕЛЕФОНА В ШЛАГБАУМЕ”, "PERIOD": "always", "PERIOD_DESCRIPTION": "always", "TIMEOUT": "30" })
Затем соберем как матрешку следующую структуру result_object
.
В ней мы в основном подставляем константы, но отдельно стоит обратить внимание на этот фрагмент кода: followme_struct=[len(followme_struct),followme_struct]
len(followme_struct)
– количество номеров, на которые будем делать вызов.
Далее пакуем третий слой матрешки:
exolve_response_object = ExolveResponse( id=1, jsonrpc="2.0", result=result_object )
Создаем запись в лог о попытке открыть шлагбаум(ы):
logger.info(f'barriers {barrier_phones} try to open for {user_phone}')
И возвращаем ответ:
return exolve_response_obje
ct
Осталось собрать все вместе в файле main.py и посмотреть код в деле
from fastapi import FastAPI from api import crud, access, log app = FastAPI( title="API for bariier control via MTC Exolve", description="This API requires an API key in the X-API-Key header demo key is 12345", version="1.0.0") app.include_router(access.router) # Use the router app.include_router(log.router) # Use the router app.include_router(crud.router) # Use the router
По сути в данном файле мы создаем главную точку в API сервиса (app = FastAPI).
И далее подключаем к ней ранее созданные роутеры.
Пусть вас не смущает то, что мы еще не разобрали роутер «crud». Это опциональные функции для управления записями в базе данных, если они вам не нужны, вы можете просто удалить или закомментировать все, что связано с «crud».
Итоги
Для запуска сервиса локально воспользуйтесь командой
uvicorn app.main:app
Если вы как и я первый раз встречаетесь с FastAPI, то рекомендую более подробно ознакомится со статьей на Хабре, в которой очень хорошо расписаны азы работы с ним.
После того как сервис запустится, перейдите по адресу
Должен открыться Swagger.

Для теста создадим запись в логах:

В файле app.log должна появиться запись
2025-03-23 16:53:35,382 - myapp - INFO - log endpoint was called. body: Привет мир
Однако протестировать функцию звонка мы сможем только запустив сервис во внешний мир.
Лично я арендовал недорогой облачный сервер и настроил его согласно инструкции.
После этого если мы отправим на адрес
http:/<Ваш IP или домен>/access/
запрос со следующим body:
{ "id": "1", "jsonrpc": "2.0", "method": "getcontrolcallfollowme", "params": { "h323_conf_id": " BC5F236C 5AD211E9 81BA5CB9 01FED6FC", "numberA": "79123456784", "sip_id": "79123456785" } }
Вернется следующий ответ:
{ "id": 1, "jsonrpc": "2.0", "result": { "redirect_type": 3, "event_URL": "http://62.113.44.250/logs", "client_id": "1215882", "event_extended": "N", "masking": "Y", "display_number": "79841860477", "followme_struct": [ 2, [ { "I_FOLLOW_ORDER": "1", "ACTIVE": "Y", "NAME": "BARRIER_PHONE", "REDIRECT_NUMBER": "79123456782", "PERIOD": "always", "PERIOD_DESCRIPTION": "always", "TIMEOUT": "30" }, { "I_FOLLOW_ORDER": "2", "ACTIVE": "Y", "NAME": "BARRIER_PHONE", "REDIRECT_NUMBER": "79123456781", "PERIOD": "always", "PERIOD_DESCRIPTION": "always", "TIMEOUT": "30" } ] ] } }
Это означает, что MTC Exolve позвонит на оба шлагбаума одновременно, и они оба откроются.
Если у вас еще остались силы, быстро пробежимся по функциям CRUD.
Бонус – CRUD API
Еще раз напомню, что все материалы доступны на GitHub.
В файле /api/crud.py представлена дополнительная логика, с помощью которой можно выполнять стандартные операции, чтения, удаления, добавления и обновления записей в таблицах базы данных.

Полный листинг модуля под спойлером.
Скрытый текст
from fastapi import APIRouter, HTTPException, Depends, Body from .config import * from pydantic import BaseModel, Field from typing import Optional, List, Annotated import sqlite3 from .log import logger from fastapi.security import APIKeyHeader API_KEY = "12345" # Replace with your key api_key_header = APIKeyHeader(name="X-API-Key", auto_error=True, description="API Key required for access") async def get_api_key(api_key_header: Annotated[str, Depends(api_key_header)]): """ Dependency to validate the API key from the header. """ if api_key_header == API_KEY: return api_key_header else: raise HTTPException(status_code=403, detail="Invalid API Key") class Message(BaseModel): message:str class User(BaseModel): id: int phone: int name: str last_name: str position: Optional[str] = None class CreatedUser(BaseModel): phone: int = Field(..., description="User's phone.") name: str = Field(..., description="User's name.") last_name: str = Field(..., description="User's last name.") last_name: str position: Optional[str] = Field(None, description="user position (optional).") class CreatedBarrier(BaseModel): zone: int = Field(..., description="Security zone (location of the barrier on the site plan).") phone: int = Field(..., description="Phone of GSM module for opening barrier.") class Barrier(BaseModel): id: int zone: int phone: int class Access(BaseModel): user_phone: int barrier_zone: int def read_tables(table, id=None): """ Function for read users and barriers from db. """ connection = sqlite3.connect(DB_NAME) cursor = connection.cursor() where = "" if id: where = f" WHERE `id` = {id}" cursor.execute(f'SELECT * FROM {table} {where}') column_names = [col[0] for col in cursor.description] rows = cursor.fetchall() response = [dict(zip(column_names, row)) for row in rows] connection.close() if not response: return None return response[0] if id else response def update_tables(table, operation, data=None, id=None): """ Function for read users, barriers, and accesses from DB. """ try: connection = sqlite3.connect(DB_NAME) cursor = connection.cursor() sql_data = () fields = { "users":("phone", "name", "last_name", "position"), "barriers":("zone", "phone"), "users_access":("user_phone", "barrier_zone") } if data: match table: case "users": sql_data = (data.phone, data.name, data.last_name, data.position) case "barriers": sql_data = (data.zone, data.phone) case "users_access": sql_data = (data.user_phone, data.barrier_zone) match operation: case "create": values=[] for field in fields[table]: values.append(f'?') result_string = ', '.join(values) sql = f'UPDATE {table} SET {result_string} Where id = {id}' sql = f'INSERT INTO {table} {fields[table]} VALUES ({result_string})' cursor.execute(sql, sql_data) connection.commit() case "delete": sql ="" if id: sql = f'DELETE FROM {table} WHERE id = {id}' elif data: sql = f'DELETE FROM {table} WHERE user_phone = {data.user_phone} AND barrier_zone = {data.barrier_zone} ' cursor.execute(sql) connection.commit() case "update": update_fields = [] for field in fields[table]: update_fields.append(f'{field} = ?') result_string = ', '.join(update_fields) sql = f'UPDATE {table} SET {result_string} Where id = {id}' sql_data = tuple(data.model_dump().values()) cursor.execute(sql, sql_data) connection.commit() case None: return None except sqlite3.Error as e: print(f"Error: {e}") return None finally: if connection: connection.close() return cursor.lastrowid router = APIRouter(tags=["CRUD"]) @router.get("/bariers/", response_model=List[Barrier]) async def list_bariers(api_key: Annotated[str, Depends(get_api_key)]): """ List all barriers. """ response = read_tables(BARIERS_TABLE) return response @router.get("/barrier/{barrier_id}", response_model=Barrier) async def list_barrier(barrier_id: int, api_key: Annotated[str, Depends(get_api_key)]): """ Get a barrier with specific id. """ response = read_tables(BARIERS_TABLE, barrier_id) if not response: raise HTTPException(status_code=404, detail="Barrier not found") return response @router.post("/barriers/", response_model=Message) async def create_barrier(barrier: CreatedBarrier, api_key: Annotated[str, Depends(get_api_key)]): """ Creates a new barrier. """ result = update_tables(BARIERS_TABLE, "create", data=barrier) if result == None: raise HTTPException(status_code=400, detail="Bad request") logger.info(f'barrier created {barrier}') return {"message": f"created barrier with id = {result}"} @router.delete("/barriers/{barrier_id}", response_model=Message) async def delete_barrier(barrier_id:int, api_key: Annotated[str, Depends(get_api_key)]): """ Delete a barrier with specific id. """ result = update_tables(BARIERS_TABLE, "delete", None, barrier_id) if result == None: raise HTTPException(status_code=400, detail="Bad request") logger.info(f'barrier {barrier_id} deleted') return {"message":"ok"} @router.patch("/barriers/{barrier_id}", response_model=Message) async def edit_barrier(barrier_id:int, barrier:CreatedBarrier,api_key: Annotated[str, Depends(get_api_key)]): """ Update a barrier with specific id (fill in all the request attributes). """ result = update_tables(BARIERS_TABLE, "update", barrier, barrier_id) if result == None: raise HTTPException(status_code=400, detail="Bad request") logger.info(f'barrier {barrier_id} updated {barrier}') return {"message":"ok"} @router.get("/accesses/", response_model=List[Access]) async def list_accesses(api_key: Annotated[str, Depends(get_api_key)]): """ List accesses (user to barrier). """ response = read_tables(ACCESS_TABLE) return response @router.post("/accesses/", response_model=Message) async def create_access(access: Access, api_key: Annotated[str, Depends(get_api_key)]): """ Creates a new access (user to barrier). """ result = update_tables(ACCESS_TABLE, "create", data=access) if result == None: raise HTTPException(status_code=400, detail="Bad request") logger.info(f'user access created {access}') return {"message": f"created access with id = {result}"} @router.delete("/accesses/", response_model=Message) async def delete_user(access:Access, api_key: Annotated[str, Depends(get_api_key)]): """ Delete access (user to barrier). """ result = update_tables(ACCESS_TABLE, "delete",access, None) if result == None: raise HTTPException(status_code=400, detail="Bad request") logger.info(f'user access deleted {access}') return {"message":"ok"} @router.get("/users/", response_model=List[User]) async def list_users(api_key: Annotated[str, Depends(get_api_key)]): """ List all users. """ response = read_tables(USER_TABLE) return response @router.get("/users/{user_id}", response_model=User) async def list_user(user_id: int, api_key: Annotated[str, Depends(get_api_key)]): """ Get a user with specific id. """ response = read_tables(USER_TABLE, user_id) if not response: raise HTTPException(status_code=404, detail="User not found") return response @router.post("/users/", response_model=Message) async def create_user(user: CreatedUser, api_key: Annotated[str, Depends(get_api_key)]): """ Creates a new user. """ result = update_tables(USER_TABLE, "create", data=user) if result == None: raise HTTPException(status_code=400, detail="Bad request") logger.info(f'user created {user}') return {"message": f"created user with id {result}"} @router.delete("/users/{user_id}", response_model=Message) async def delete_user(user_id:int, api_key: Annotated[str, Depends(get_api_key)]): """ Delete a user with specific id. """ result = update_tables(USER_TABLE, "delete", None, user_id) if result == None: raise HTTPException(status_code=400, detail="Bad request") logger.info(f'user_id = {user_id} deleted') return {"message":"ok"} @router.patch("/users/{user_id}", response_model=Message) async def edit_user(user_id:int, user:CreatedUser, api_key: Annotated[str, Depends(get_api_key)]): """ Update a user with specific id (fill in all the request attributes). """ result = update_tables(USER_TABLE, "update", user, user_id) if result == None: raise HTTPException(status_code=400, detail="Bad request") logger.info(f'user_id = {user_id} updated {user}') return {"message":"ok"}
Мы не будем разбирать всю логику подробно, разберем лишь функции чтения и изменения данных и пару типовых методов, которые к ним обращаются.
Функция read_tables формирует в БД запрос на получение данных из таблицы. Я постарался сделать ее поведение универсальным, чтобы выбирать из какой таблицы читать, а также указать id, для которого нужно вернуть данные (это пригодится в методах чтения одной записи из выборки).
def read_tables(table, id=None): """ Function for read users and barriers from db. """ connection = sqlite3.connect(DB_NAME) cursor = connection.cursor() where = "" if id: where = f" WHERE `id` = {id}" cursor.execute(f'SELECT * FROM {table} {where}') column_names = [col[0] for col in cursor.description] rows = cursor.fetchall() response = [dict(zip(column_names, row)) for row in rows] connection.close() if not response: return None return response[0] if id else response
А вот функция update_tables более комплексная. Я тоже попытался сделать её универсальной, чтобы с помощью нее обрабатывать запросы на удаление, обновление и создание новых записей в таблицах.
def update_tables(table, operation, data=None, id=None): """ Function for read users, barriers, and accesses from DB. """ try: connection = sqlite3.connect(DB_NAME) cursor = connection.cursor() sql_data = () fields = { "users":("phone", "name", "last_name", "position"), "barriers":("zone", "phone"), "users_access":("user_phone", "barrier_zone") } if data: match table: case "users": sql_data = (data.phone, data.name, data.last_name, data.position) case "barriers": sql_data = (data.zone, data.phone) case "users_access": sql_data = (data.user_phone, data.barrier_zone) match operation: case "create": values=[] for field in fields[table]: values.append(f'?') result_string = ', '.join(values) sql = f'UPDATE {table} SET {result_string} Where id = {id}' sql = f'INSERT INTO {table} {fields[table]} VALUES ({result_string})' cursor.execute(sql, sql_data) connection.commit() case "delete": sql ="" if id: sql = f'DELETE FROM {table} WHERE id = {id}' elif data: sql = f'DELETE FROM {table} WHERE user_phone = {data.user_phone} AND barrier_zone = {data.barrier_zone} ' cursor.execute(sql) connection.commit() case "update": update_fields = [] for field in fields[table]: update_fields.append(f'{field} = ?') result_string = ', '.join(update_fields) sql = f'UPDATE {table} SET {result_string} Where id = {id}' sql_data = tuple(data.model_dump().values()) cursor.execute(sql, sql_data) connection.commit() case None: return None except sqlite3.Error as e: print(f"Error: {e}") return None finally: if connection: connection.close() return cursor.lastrowid
И рассмотрим код пары методов, в которых используются вышеуказанные функции:
Начнем с чтения всех данных о шлагбаумах. метода GET /barriers/. По сути он лишь обращается к базе данных и возвращает список с данными по всем шлагбаумам.
router = APIRouter(tags=["CRUD"]) @router.get("/bariers/", response_model=List[Barrier]) async def list_bariers(api_key: Annotated[str, Depends(get_api_key)]): """ List all barriers. """ response = read_tables(BARIERS_TABLE) return response
Обратите внимание на api_key: Annotated[str, Depends(get_api_key)
, я решил что методы CRUD должны быть с проверкой авторизации, поэтому для доступа к ним нужен будет код доступа 12345. Авторизацию сгенерировала нейросеть, поэтому я не буду её подробно разбирать, если вы изучите полный листинг, то быстро разберетесь в типовом коде.
Результат работы:

Пришло время метода для создания записи о барьере POST /barriers/. Метод принимает на вход номер зоны и телефон внутри GSM-модуля шлагбаума. В случае если запись успешно создана, вернет сообщение с id созданной записи в таблице barrires.
@router.post("/barriers/", response_model=Message) async def create_barrier(barrier: CreatedBarrier, api_key: Annotated[str, Depends(get_api_key)]): """ Creates a new barrier. """ result = update_tables(BARIERS_TABLE, "create", data=barrier) if result == None: raise HTTPException(status_code=400, detail="Bad request") logger.info(f'barrier created {barrier}') return {"message": f"created barrier with id = {result}"}
Результат работы:

Теперь у нас есть не только функционал для открытия шлагбаума, но и заготовка для админ-панели по управлению доступом.
Остальные методы вы сможете изучить самостоятельно, развернув сервис локально.
Спасибо, что дочитали до конца! Надеюсь, статья была интересной. Буду рад ознакомится с конструктивными комментариями.
ссылка на оригинал статьи https://habr.com/ru/articles/893432/
Добавить комментарий