Дистанционное управление шлагбаумом с помощью MTC Exolve и GSM модуля

от автора

Несмотря на то, что я системный аналитик, проклятие, которое начинается с фразы  «тыж программист…», иногда преследует и меня. Недавно у меня спросили совета, как сделать так, чтобы для нужных людей шлагбаум удаленно открывался без транспондера и сложных систем. 

К сожалению, сходу я не придумал ответа на вопрос, но задачка показалась мне любопытной, и я немного изучил вопрос. 

Решение нашёл быстро — в качестве аппаратной части можно использовать GSM-реле, а контроль доступа реализовать с помощью микро сервиса для переадресации вызова.

И поскольку у меня наличествовал тестовый доступ к МТС Exolve, грех было им не воспользоваться. Тем более, что соответствующее API для управления входящим вызовом есть.

Оглавление

Если честно, я не знаток автоматических шлагбаумов, поэтому в статье я не буду привязываться к аппаратной части, мне кажется подойдёт практически любое устройство, которое предложит поисковик по запросу «GSM модуль для шлагбаума».  Как правило у многих моделей шлагбаумов есть контакт, который при замыкании (размыкании реле) будет поднимать и опускать шлагбаум.

Как видите решение очень простое, в принципе можно и не городить никаких сервисов, а просто раздать знакомым номер телефона SIM-карты в GSM-модуле. Но что делать, если этот номер станет известен третьим лицам? Нам явно нужен белый список разрешенных номеров. По хорошему такие проблемы решают комплексные системы безопасности. Но мне очень хотелось покрутить в руках FastAPI. Поэтому мы сделаем свой сервис по контролю звонящих с «картошными играми и прочими увеселениями».

Однако, прежде чем приступать к делу – традиционный дисклеймер. Я не программист.  Задачу я решал скорее концептуально и корнер-кейсы особо не продумывал. Поэтому, настоятельно не рекомендую относиться к материалам статьи, как к истине в последней инстанции и использовать код без изменений в продуктовой среде.

Что планируем сделать

Формальности улажены и можно переходить к делу.

Рассмотрим следующий сценарий:

Предусловия.

  1. Есть несколько (минимум две) зоны, защищенные шлагбаумами, в каждом стоит свой GSM-модуль. 

  2. Пользователям может быть разрешен доступ к одной или сразу нескольким зонам.

Мы будем прорабатывать следующий:

  1. Пользователь подъезжает к шлагбауму.

  2. Набирает номер.

  3. MTС Exolve делает запрос к нам в сервис

  4. Сервис проверяет в какие зоны разрешен проезд пользователю и делает обзвон соответствующих шлагбаумов.

  5. Шлагбаумы открываются.

Схема управления шлагбаумом

Схема управления шлагбаумом

База данных

Начнем с базы данных. в которой будем хранить доступы пользователей из белого списка.

Для прототипа я взял SQlite ибо её функций нам хватит с головой, а ее поддержка уже давно реализована «из коробки» в Python 3.

Фактически нам нужно две таблицы:

  • Таблица для данных о пользователях, которые будут звонить на контрольный номер (users). 

  • Таблица для данных о номерах телефонах GSM-модулей в шлагбаумах и защищаемых ими зонах (barriers).

Однако, для реализации связи многие-ко-многим, мы сделаем третью табличку в которой непосредственно соединим пользователей и доступные им зоны для проезда (users_access).

В результате получим такую структуру.

ER-диаграмма

ER-диаграмма
Структура БД

Структура БД

Поскольку все исходники я разместил на GitHub, воздержусь от подробного описания структуры таблиц..

Отмечу только, что в user_access составной первичный ключ из обоих полей, при этом оба поля это внешние ключи в соответствующих таблицах. 

В качестве тестового примера мы создадим два шлагбаума.

Один для обычных гостей (#1), а второй (№2) для сотрудников.

Данные таблицы barriers

Данные таблицы barriers

Все имена и номера телефонов выдуманы, любые совпадения случайны. 

Создадим двух пользователей.

Одного гостя (Habr) и одного сотрудника (Habra).

Данные таблицы users

Данные таблицы users

Осталось задать доступы. Пользователь Habr сможет проехать только через гостевой шлагбаум, а Habra может проехать через оба.

Данные таблицы users_access

Данные таблицы users_access

Настройка УВВ API МТС Exolve

Прежде чем приступить к разработке сервиса, нам необходимо настроить управление входящим вызовов в МТС Exolve. Я надеюсь, что у вас есть к нему доступ.

Согласно документации, необходимо с помощью метода setSipCallControlU установить URL, к которому Exolve будет обращаться за дальнейшими инструкциями по обработке вызова.

Для этого необходимо отправить POST запрос на адрес:

https://api.mtt.ru/ipcr/ (указав в базовой авторизации логин и пароль).

Тело запроса:

{     "id":"1",     "jsonrpc":"2.0",     "method": "setSipCallControlURL",     "params": {         "sip_id": "Ваш SIP ID (ном",          "url": "Ваш URL"     } }

В нашем случае  sip_id – это номер телефона, на который будут звонить пользователи. А url – это адрес метода, который скажет, что делать дальше с вызовом вида <ваш домен или IP>/access.

Дополнительно еще можно установить резервный номер, на случай если сервис будет недоступен, например, номер поста охраны.

Сервис для управления входящим вызовом и логирование запроса

Итак, пришло время браться за разработку сервиса. Я использовал Python v. 3.10 и fastapi v. 0.115, но думаю, что подойдут и другие версии.

Освежим в памяти, схему взаимодействия, рассмотрим её в формате диаграммы последовательности.

Схема работы с Exolve

Схема работы с Exolve

Создадим файловую структуру:

├── api

│   ├── access.py (открытие шлагбаума)

│   ├── config.py (общие константы и настройки)

│   ├── crud.py (методы для управления записями в БД)

│   ├── __init__.py (служебный файл для подключения модулей)

│   └── log.py (работа с записями в лог)

├── data.sqlite (база данных)

├── main.py (главный файл скрипта)

└── readme.md

└──app.log (появится после первого логируемого запроса)

Начнем с файла api/config.py

DB_NAME = 'data.sqlite' ACCESS_TABLE = 'users_access' BARIERS_TABLE = 'barriers' USER_TABLE = 'users' LOGGING = {    'version': 1,    'disable_existing_loggers': False,    'formatters': {        'simple': {            'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'        },    },    'handlers': {        'file': {            'class': 'logging.FileHandler',            'filename': 'app.log',            'formatter': 'simple'        },    },     'loggers': {        'myapp': {            'handlers': ['file'],            'level': 'INFO',            'propagate': False        },    } } all = ["DB_NAME", "ACCESS_TABLE", "BARIERS_TABLE", "USER_TABLE", "LOGGING"]

В нем мы храним константы, которые будем использовать в других скриптах, а также создаем типовую настройку логов для библиотеки logging.

Не отходя от кассы, перейдем к api/logging.py

from fastapi import APIRouter, Body from typing import Any from . import config import logging.config  logging.config.dictConfig(config.LOGGING) logger = logging.getLogger('myapp') router = APIRouter(tags=["Logs"])  @router.post('/logs') async def get_body(body: Any = Body(None)):    """    Create log for MTC Exolve events.    """    logger.info(f'log endpoint was called. body: {body}')    return body

В данном скрипте импортируем необходимые библиотеки, создаем экземпляр логгера, который будем использовать в других методах.
А также создаем группу дочерних API (router), в которой будет всего 1 POST метод 

Данный метод при вызове просто пишет информационное сообщение с телом запроса.
Он пригодится нам либо для тестирования логов, либо для сохранения сведений о событии при запросе от МТС Exolve.

Пришло время основной бизнес-логики, ради которой все и затеял – скрипту api/access.py

Полный листинг скрипта спрячу под спойлером:

Скрытый текст
from fastapi import APIRouter, HTTPException,  Body from pydantic import BaseModel from typing import  List,  Union, Any import sqlite3  from .config import * from .log import logger  EVENT_URL = "<url or IP>/logs" CLIENT_ID = "MTC Exolve client ID" DISPLAY_NUMBER  = "MTC Exolve phone number"  class FollowMeRule(BaseModel):    I_FOLLOW_ORDER: str    ACTIVE: str    NAME: str    REDIRECT_NUMBER: str    PERIOD: str    PERIOD_DESCRIPTION: str    TIMEOUT: str  class FollowMeStruct(BaseModel):    List[FollowMeRule]   class Result(BaseModel):    redirect_type: int    event_URL: str    client_id: str    event_extended: str    masking: str    display_number: str    followme_struct: List[Union[int, List[FollowMeRule]]]   class ExolveResponse(BaseModel):    id: int    jsonrpc: str    result: Result  def open_barriers(phone):    connection = sqlite3.connect(DB_NAME)    cursor = connection.cursor()    cursor.execute(f'''SELECT phone from {BARIERS_TABLE} as b    JOIN {ACCESS_TABLE} as u on b.zone = u.barrier_zone    where u.user_phone = {phone} ''')    column_names = [col[0] for col in cursor.description]    rows = cursor.fetchall()    response = [dict(zip(column_names, row)) for row in rows]    connection.close()    return response  router = APIRouter(tags=["Access control"])  @router.post("/access/", response_model=ExolveResponse) async def response_to_exolve(body: Any = Body(None)):    """    Get response to MTC Exolve for redirect phone call to barrier GSM \n (s.a [documentation](https://wiki.exolve.ru/pages/viewpage.action?pageId=106332539))    """    if 'params' in body and 'numberA' in body['params']:        user_phone = body["params"]["numberA"]    else:        raise HTTPException(status_code=400, detail="Bad request.Filed params.numberA required")    barrier_phones =  open_barriers(user_phone)    if (barrier_phones == None):        raise HTTPException(status_code=403, detail="Access to barriers not allowed")    followme_struct = []    for i in range(0, len(barrier_phones) ) :        row = barrier_phones[i]        followme_struct.append(            {                "I_FOLLOW_ORDER": str(i+1),                "ACTIVE": "Y",                "NAME": "BARRIER_PHONE",                "REDIRECT_NUMBER": str(row["phone"]),                "PERIOD": "always",                "PERIOD_DESCRIPTION": "always",                "TIMEOUT": "30"            })        # Создаем объект Result    result_object = Result(        redirect_type="3",        event_URL=EVENT_URL ,        client_id= CLIENT_ID,        event_extended="N",        masking= "Y",        display_number= DISPLAY_NUMBER,        followme_struct=[len(followme_struct),followme_struct]    )    exolve_response_object = ExolveResponse(            id=1,            jsonrpc="2.0",            result=result_object    )    logger.info(f'barriers {barrier_phones} try to open for {user_phone}')    return exolve_response_object

В самом начале мы импортируем необходимые модули и настраиваем константы:

from fastapi import APIRouter, HTTPException,  Body from pydantic import BaseModel from typing import  List,  Union, Any import sqlite3  from .config import * from .log import logger EVENT_URL = "<url or IP>/logs" CLIENT_ID = "MTC Exolve client ID" DISPLAY_NUMBER  = "MTC Exolve phone number"

Дальше идет определение классов данных для метода. По сути мы как конструктор собираем из нескольких блоков структуру ожидаемого ответа, указанную в документации.

class FollowMeRule(BaseModel):    I_FOLLOW_ORDER: str    ACTIVE: str    NAME: str    REDIRECT_NUMBER: str    PERIOD: str    PERIOD_DESCRIPTION: str    TIMEOUT: str   class FollowMeStruct(BaseModel):    List[FollowMeRule]    class Result(BaseModel):    redirect_type: int    event_URL: str    client_id: str    event_extended: str    masking: str    display_number: str    followme_struct: List[Union[int, List[FollowMeRule]]]    class ExolveResponse(BaseModel):    id: int    jsonrpc: str    result: Result 

Дальше определим функцию для поиска доступных к открытию шлагбаумов для пользователя, позвонившего на номер телефона.

Функция open_barriers проверяет по таблице users_access какие зоны безопасности доступны пользователю, склеивает их с номерами телефонов из таблицы barriers, и возвращает их в виде списка.

В принципе функция в  скрипте вызывается один раз, и можно не выносить её отдельно, но я решил для удобства чтения сделать отдельно.

def open_barriers(phone):    connection = sqlite3.connect(DB_NAME)    cursor = connection.cursor()    cursor.execute(f'''SELECT phone from {BARIERS_TABLE} as b    JOIN {ACCESS_TABLE} as u on b.zone = u.barrier_zone    where u.user_phone = {phone} ''')    column_names = [col[0] for col in cursor.description]    rows = cursor.fetchall()    response = [dict(zip(column_names, row)) for row in rows]    connection.close()    return response

Осталось рассмотреть описание самого метода POST /access, который вызовет MTC Exolve при звонке.

router = APIRouter(tags=["Access control"]) @router.post("/access/", response_model=ExolveResponse) async def response_to_exolve(body: Any = Body(None)):    """    Get response to MTC Exolve for redirect phone call to barrier GSM \n (s.a [documentation](https://wiki.exolve.ru/pages/viewpage.action?pageId=106332539))    """    if 'params' in body and 'numberA' in body['params']:        user_phone = body["params"]["numberA"]    else:        raise HTTPException(status_code=400, detail="Bad request.Filed params.numberA required")    barrier_phones =  open_barriers(user_phone)    if (barrier_phones == None):        raise HTTPException(status_code=403, detail="Access to barriers not allowed")    followme_struct = []    for i in range(0, len(barrier_phones) ) :        row = barrier_phones[i]        followme_struct.append(            {                "I_FOLLOW_ORDER": str(i+1),                "ACTIVE": "Y",                "NAME": "BARRIER_PHONE",                "REDIRECT_NUMBER": str(row["phone"]),                "PERIOD": "always",                "PERIOD_DESCRIPTION": "always",                "TIMEOUT": "30"            })        # Создаем объект Result    result_object = Result(        redirect_type="3",        event_URL=EVENT_URL ,        client_id= CLIENT_ID,        event_extended="N",        masking= "Y",        display_number= DISPLAY_NUMBER,        followme_struct=[len(followme_struct),followme_struct]    )    exolve_response_object = ExolveResponse(            id=1,            jsonrpc="2.0",            result=result_object    )    logger.info(f'barriers {barrier_phones} try to open for {user_phone}')    return exolve_response_object

В методе проверяется, указан ли в теле запроса номер телефона с которого совершался звонок. По идее он всегда будет указан при нормальной работы, но для удобства тестирования запросов отправляемых вручную, я добавил эту проверку.

Затем мы получаем список телефонов, на которые нужно сделать вызов с помощью функции open_barrier.

И уже на базе полученного списка создаем объект для ответа на запрос.

Я решил не усложнять логику, поскольку тело ответа в нашем случае по большей части одинаковое, мы просто изменяющиеся данные подставим в шаблон.

Пройдемся в цикле по каждой записи, которая вернулась из базы данных, и создадим структуру для каждой записи, изменяя лишь атрибуты I_FOLLOW_ORDER и REDIRECT_NUMBER

       followme_struct.append(            {                "I_FOLLOW_ORDER": “ПОРЯДКОВЫЙ НОМЕР ТЕЛЕФОНА ДЛЯ ВЫЗОВА”,                "ACTIVE": "Y",                "NAME": "BARRIER_PHONE",                "REDIRECT_NUMBER": “НОМЕР ТЕЛЕФОНА В ШЛАГБАУМЕ”,                "PERIOD": "always",                "PERIOD_DESCRIPTION": "always",                "TIMEOUT": "30"            }) 

Затем соберем как матрешку следующую структуру result_object.

В ней мы в основном подставляем константы, но отдельно стоит обратить внимание на этот фрагмент кода: followme_struct=[len(followme_struct),followme_struct]

len(followme_struct) – количество номеров, на которые будем делать вызов. 

Далее пакуем третий слой матрешки:

   exolve_response_object = ExolveResponse(            id=1,            jsonrpc="2.0",            result=result_object    )

Создаем запись в лог о попытке открыть шлагбаум(ы):

logger.info(f'barriers {barrier_phones} try to open for {user_phone}')

И возвращаем ответ:

return exolve_response_object

Осталось собрать все вместе в файле main.py и посмотреть код в деле

from fastapi import FastAPI from api import crud, access, log app = FastAPI( title="API for bariier control via MTC Exolve",    description="This API requires an API key in the X-API-Key header demo key is 12345",    version="1.0.0")  app.include_router(access.router) # Use the router app.include_router(log.router) # Use the router app.include_router(crud.router) # Use the router

По сути в данном файле мы создаем главную точку в API сервиса (app = FastAPI).

И далее подключаем к ней ранее созданные роутеры.

Пусть вас не смущает то, что мы еще не разобрали роутер «crud». Это опциональные функции для управления записями в базе данных, если они вам не нужны, вы можете просто удалить или закомментировать все, что связано с  «crud».

Итоги

Для запуска сервиса локально воспользуйтесь командой

uvicorn app.main:app

Если вы как и я первый раз встречаетесь с FastAPI, то рекомендую более подробно ознакомится со статьей на Хабре, в которой очень хорошо расписаны азы работы с ним.

После того как сервис запустится, перейдите по адресу

http://127.0.0.1:8000/docs//

Должен открыться Swagger.

Swagger сервиса

Swagger сервиса

Для теста создадим запись в логах:

Вызов метода для логирования.

Вызов метода для логирования.

В файле app.log должна появиться запись

2025-03-23 16:53:35,382 - myapp - INFO - log endpoint was called. body: Привет мир

Однако протестировать функцию звонка мы сможем только запустив сервис во внешний мир.

Лично я арендовал недорогой облачный сервер и настроил его согласно инструкции.

После этого если мы отправим на адрес

http:/<Ваш IP или домен>/access/ запрос со следующим body:

{    "id": "1",    "jsonrpc": "2.0",    "method": "getcontrolcallfollowme",    "params": {        "h323_conf_id": " BC5F236C 5AD211E9 81BA5CB9 01FED6FC",        "numberA": "79123456784",        "sip_id": "79123456785"    } }

Вернется следующий ответ:

{   "id": 1,   "jsonrpc": "2.0",   "result": {     "redirect_type": 3,     "event_URL": "http://62.113.44.250/logs",     "client_id": "1215882",     "event_extended": "N",     "masking": "Y",     "display_number": "79841860477",     "followme_struct": [       2,       [         {           "I_FOLLOW_ORDER": "1",           "ACTIVE": "Y",           "NAME": "BARRIER_PHONE",           "REDIRECT_NUMBER": "79123456782",           "PERIOD": "always",           "PERIOD_DESCRIPTION": "always",           "TIMEOUT": "30"         },         {           "I_FOLLOW_ORDER": "2",           "ACTIVE": "Y",           "NAME": "BARRIER_PHONE",           "REDIRECT_NUMBER": "79123456781",           "PERIOD": "always",           "PERIOD_DESCRIPTION": "always",           "TIMEOUT": "30"         }       ]     ]   } }

Это означает, что MTC Exolve позвонит на оба шлагбаума одновременно, и они оба откроются.

Если у вас еще остались силы, быстро пробежимся по функциям CRUD.

Бонус – CRUD API

Еще раз напомню, что все материалы доступны на GitHub.

В файле /api/crud.py представлена дополнительная логика, с помощью которой можно выполнять стандартные операции, чтения, удаления, добавления и обновления записей в таблицах базы данных.

Схема работы с API. Операции CRUD

Схема работы с API. Операции CRUD

Полный листинг модуля под спойлером.

Скрытый текст
from fastapi import APIRouter, HTTPException,  Depends,  Body from .config import * from pydantic import BaseModel, Field from typing import Optional, List, Annotated import sqlite3 from .log import logger from fastapi.security import APIKeyHeader   API_KEY = "12345"  # Replace with your key  api_key_header = APIKeyHeader(name="X-API-Key", auto_error=True, description="API Key required for access")   async def get_api_key(api_key_header: Annotated[str, Depends(api_key_header)]):    """    Dependency to validate the API key from the header.    """    if api_key_header == API_KEY:        return api_key_header    else:        raise HTTPException(status_code=403, detail="Invalid API Key")  class Message(BaseModel):    message:str   class User(BaseModel):    id: int    phone: int    name: str    last_name: str    position: Optional[str] = None   class CreatedUser(BaseModel):    phone: int = Field(...,  description="User's phone.")    name: str = Field(...,  description="User's name.")    last_name: str = Field(...,  description="User's last name.")    last_name: str    position: Optional[str] = Field(None, description="user position (optional).")   class CreatedBarrier(BaseModel):    zone: int = Field(...,  description="Security zone (location of the barrier on the site plan).")    phone: int = Field(...,  description="Phone of GSM module for opening barrier.")    class Barrier(BaseModel):    id: int    zone: int    phone: int   class Access(BaseModel):    user_phone: int    barrier_zone: int   def read_tables(table, id=None):    """    Function for read users and barriers from db.    """    connection = sqlite3.connect(DB_NAME)    cursor = connection.cursor()    where = ""    if id:        where = f" WHERE `id` = {id}"    cursor.execute(f'SELECT * FROM {table} {where}')    column_names = [col[0] for col in cursor.description]    rows = cursor.fetchall()    response = [dict(zip(column_names, row)) for row in rows]    connection.close()    if not response:        return None    return response[0] if id else response   def update_tables(table, operation, data=None, id=None):    """    Function for read users, barriers, and accesses from DB.    """    try:        connection = sqlite3.connect(DB_NAME)        cursor = connection.cursor()        sql_data = ()        fields = {            "users":("phone", "name", "last_name", "position"),            "barriers":("zone", "phone"),            "users_access":("user_phone", "barrier_zone")            }        if data:            match table:                case "users":                    sql_data = (data.phone, data.name, data.last_name, data.position)                case "barriers":                    sql_data = (data.zone, data.phone)                case "users_access":                    sql_data = (data.user_phone, data.barrier_zone)        match operation:            case "create":                values=[]                for field in fields[table]:                     values.append(f'?')                result_string = ', '.join(values)                sql = f'UPDATE {table} SET {result_string} Where id = {id}'                sql = f'INSERT INTO {table} {fields[table]} VALUES ({result_string})'                cursor.execute(sql, sql_data)                connection.commit()            case "delete":                sql =""                if id:                    sql = f'DELETE FROM {table}  WHERE id = {id}'                elif data:                    sql = f'DELETE FROM {table}  WHERE user_phone = {data.user_phone} AND barrier_zone = {data.barrier_zone} '                cursor.execute(sql)                connection.commit()            case "update":                update_fields = []                for field in fields[table]:                     update_fields.append(f'{field} = ?')                result_string = ', '.join(update_fields)                sql = f'UPDATE {table} SET {result_string} Where id = {id}'                sql_data = tuple(data.model_dump().values())                cursor.execute(sql, sql_data)                connection.commit()            case None:                return None    except sqlite3.Error as e:            print(f"Error: {e}")            return None    finally:            if connection:                connection.close()    return cursor.lastrowid  router = APIRouter(tags=["CRUD"])  @router.get("/bariers/", response_model=List[Barrier]) async def list_bariers(api_key: Annotated[str, Depends(get_api_key)]):    """    List all barriers.    """    response = read_tables(BARIERS_TABLE)    return response   @router.get("/barrier/{barrier_id}", response_model=Barrier) async def list_barrier(barrier_id: int, api_key: Annotated[str, Depends(get_api_key)]):    """    Get a barrier with specific id.    """    response = read_tables(BARIERS_TABLE, barrier_id)    if not response:        raise HTTPException(status_code=404, detail="Barrier not found")    return response  @router.post("/barriers/", response_model=Message) async def create_barrier(barrier: CreatedBarrier, api_key: Annotated[str, Depends(get_api_key)]):    """    Creates a new barrier.    """    result = update_tables(BARIERS_TABLE, "create", data=barrier)    if result == None:         raise HTTPException(status_code=400, detail="Bad request")    logger.info(f'barrier created {barrier}')    return {"message": f"created barrier with id = {result}"}   @router.delete("/barriers/{barrier_id}", response_model=Message) async def delete_barrier(barrier_id:int, api_key: Annotated[str, Depends(get_api_key)]):    """    Delete a barrier with specific id.    """    result = update_tables(BARIERS_TABLE, "delete", None, barrier_id)    if result == None:         raise HTTPException(status_code=400, detail="Bad request")    logger.info(f'barrier {barrier_id} deleted')    return {"message":"ok"}  @router.patch("/barriers/{barrier_id}", response_model=Message) async def edit_barrier(barrier_id:int, barrier:CreatedBarrier,api_key: Annotated[str, Depends(get_api_key)]):    """    Update a barrier with specific id (fill in all the request attributes).    """    result = update_tables(BARIERS_TABLE, "update", barrier, barrier_id)    if result == None:         raise HTTPException(status_code=400, detail="Bad request")    logger.info(f'barrier {barrier_id} updated {barrier}')    return {"message":"ok"}   @router.get("/accesses/", response_model=List[Access]) async def list_accesses(api_key: Annotated[str, Depends(get_api_key)]):    """    List accesses (user to barrier).    """    response = read_tables(ACCESS_TABLE)    return response  @router.post("/accesses/", response_model=Message) async def create_access(access: Access, api_key: Annotated[str, Depends(get_api_key)]):    """    Creates a new access (user to barrier).    """    result = update_tables(ACCESS_TABLE, "create", data=access)    if result == None:         raise HTTPException(status_code=400, detail="Bad request")    logger.info(f'user access created {access}')    return {"message": f"created access with id = {result}"}   @router.delete("/accesses/", response_model=Message) async def delete_user(access:Access, api_key: Annotated[str, Depends(get_api_key)]):    """    Delete access (user to barrier).    """    result = update_tables(ACCESS_TABLE, "delete",access, None)    if result == None:         raise HTTPException(status_code=400, detail="Bad request")    logger.info(f'user access deleted {access}')    return {"message":"ok"}  @router.get("/users/", response_model=List[User]) async def list_users(api_key: Annotated[str, Depends(get_api_key)]):    """    List all users.    """    response = read_tables(USER_TABLE)    return response     @router.get("/users/{user_id}", response_model=User) async def list_user(user_id: int, api_key: Annotated[str, Depends(get_api_key)]):    """    Get a user with specific id.    """    response = read_tables(USER_TABLE, user_id)    if not response:        raise HTTPException(status_code=404, detail="User not found")    return response   @router.post("/users/", response_model=Message) async def create_user(user: CreatedUser, api_key: Annotated[str, Depends(get_api_key)]):    """    Creates a new user.    """    result = update_tables(USER_TABLE, "create", data=user)    if result == None:         raise HTTPException(status_code=400, detail="Bad request")    logger.info(f'user created {user}')    return {"message": f"created  user with id {result}"}   @router.delete("/users/{user_id}", response_model=Message) async def delete_user(user_id:int, api_key: Annotated[str, Depends(get_api_key)]):    """    Delete a user with specific id.    """    result = update_tables(USER_TABLE, "delete", None, user_id)    if result == None:         raise HTTPException(status_code=400, detail="Bad request")    logger.info(f'user_id = {user_id} deleted')    return {"message":"ok"}     @router.patch("/users/{user_id}", response_model=Message) async def edit_user(user_id:int, user:CreatedUser, api_key: Annotated[str, Depends(get_api_key)]):    """    Update a user      with specific id (fill in all the request attributes).    """    result = update_tables(USER_TABLE, "update", user, user_id)    if result == None:         raise HTTPException(status_code=400, detail="Bad request")    logger.info(f'user_id = {user_id} updated {user}')    return {"message":"ok"}

Мы не будем разбирать всю логику подробно, разберем лишь функции чтения и изменения данных и пару типовых методов, которые к ним обращаются.

Функция read_tables формирует в БД запрос на получение данных из таблицы. Я постарался сделать ее поведение универсальным, чтобы выбирать из какой таблицы читать, а также указать id, для которого нужно вернуть данные (это пригодится в методах чтения одной записи из выборки). 

def read_tables(table, id=None):    """    Function for read users and barriers from db.    """    connection = sqlite3.connect(DB_NAME)    cursor = connection.cursor()    where = ""    if id:        where = f" WHERE `id` = {id}"    cursor.execute(f'SELECT * FROM {table} {where}')    column_names = [col[0] for col in cursor.description]    rows = cursor.fetchall()    response = [dict(zip(column_names, row)) for row in rows]    connection.close()    if not response:        return None    return response[0] if id else response

А вот функция update_tables более комплексная. Я тоже попытался сделать её универсальной, чтобы с помощью нее обрабатывать запросы на удаление, обновление и создание новых записей в таблицах.

def update_tables(table, operation, data=None, id=None):    """    Function for read users, barriers, and accesses from DB.    """    try:        connection = sqlite3.connect(DB_NAME)        cursor = connection.cursor()        sql_data = ()        fields = {            "users":("phone", "name", "last_name", "position"),            "barriers":("zone", "phone"),            "users_access":("user_phone", "barrier_zone")            }        if data:            match table:                case "users":                    sql_data = (data.phone, data.name, data.last_name, data.position)                case "barriers":                    sql_data = (data.zone, data.phone)                case "users_access":                    sql_data = (data.user_phone, data.barrier_zone)        match operation:            case "create":                values=[]                for field in fields[table]:                     values.append(f'?')                result_string = ', '.join(values)                sql = f'UPDATE {table} SET {result_string} Where id = {id}'                sql = f'INSERT INTO {table} {fields[table]} VALUES ({result_string})'                cursor.execute(sql, sql_data)                connection.commit()            case "delete":                sql =""                if id:                    sql = f'DELETE FROM {table}  WHERE id = {id}'                elif data:                    sql = f'DELETE FROM {table}  WHERE user_phone = {data.user_phone} AND barrier_zone = {data.barrier_zone} '                cursor.execute(sql)                connection.commit()            case "update":                update_fields = []                for field in fields[table]:                     update_fields.append(f'{field} = ?')                result_string = ', '.join(update_fields)                sql = f'UPDATE {table} SET {result_string} Where id = {id}'                sql_data = tuple(data.model_dump().values())                cursor.execute(sql, sql_data)                connection.commit()            case None:                return None    except sqlite3.Error as e:            print(f"Error: {e}")            return None    finally:            if connection:                connection.close()    return cursor.lastrowid

И рассмотрим код пары методов, в которых используются вышеуказанные функции:

Начнем с чтения всех данных о шлагбаумах. метода GET /barriers/. По сути он лишь обращается к базе данных и возвращает список с данными по всем шлагбаумам.

router = APIRouter(tags=["CRUD"])   @router.get("/bariers/", response_model=List[Barrier]) async def list_bariers(api_key: Annotated[str, Depends(get_api_key)]):    """    List all barriers.    """    response = read_tables(BARIERS_TABLE)    return response

Обратите внимание на  api_key: Annotated[str, Depends(get_api_key), я решил  что методы CRUD должны быть с проверкой авторизации, поэтому для доступа к ним нужен будет код доступа 12345. Авторизацию сгенерировала нейросеть, поэтому я не буду её подробно разбирать, если вы изучите полный листинг, то быстро разберетесь в типовом коде.

Результат работы:

Запуск метода GET /barriers в Swagger

Запуск метода GET /barriers в Swagger

Пришло время метода для создания записи о барьере POST /barriers/. Метод принимает на вход номер зоны и телефон внутри GSM-модуля шлагбаума. В случае если запись успешно создана, вернет сообщение с id созданной записи в таблице barrires.

@router.post("/barriers/", response_model=Message) async def create_barrier(barrier: CreatedBarrier, api_key: Annotated[str, Depends(get_api_key)]):    """    Creates a new barrier.    """    result = update_tables(BARIERS_TABLE, "create", data=barrier)    if result == None:         raise HTTPException(status_code=400, detail="Bad request")    logger.info(f'barrier created {barrier}')    return {"message": f"created barrier with id = {result}"}

Результат работы:

Запуск метода POST /barriers в Swagger

Запуск метода POST /barriers в Swagger

Теперь у нас есть не только функционал для открытия шлагбаума, но и заготовка для админ-панели по управлению доступом.

Остальные методы вы сможете изучить самостоятельно, развернув сервис локально.

Спасибо, что дочитали до конца! Надеюсь, статья была интересной. Буду рад ознакомится с конструктивными комментариями.


ссылка на оригинал статьи https://habr.com/ru/articles/893432/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *