Записки разработчика: airflow->symfony-console->bitrix agents

от автора

Всем привет, случалось такое, что вам надо поставить кучу агентов битрикса на крон, а потом сидеть и разбираться — сколько они отрабатывают, отрабатывают ли вообще, когда падают или зависают?

Ну конечно случалось. Так вот, чтобы получить визуальное представление о том, что там происходит, было принято решение, вынести агенты даже не на крон, а на apache airflow. Поведаю вам, как это было реализовано.

Агенты

По факту, агенты — это выполнение php команд по расписанию. Но мы пойдем дальше и выделим их в отдельную сущность, в классы с расширением Agents.php

Создадим интерфейс для агентов

<?php  namespace App\Infrastructure\Contracts\Cli\Agents;  interface AgentInterface {     /**      * @var string Возвращаемое значение для дагов в Airflow в случае успешного исполнения.      */     const AIRFLOW_DONE = 'DONE FOR AIRFLOW';      /**      * Метод выполняет код на агентах      * @return string Возвращает строку вызова текущего метода.      */     public static function execute(string $param = ''): string;      /**      * Метод для запуска из консоли.      * @param string $param      * @return string      */     public static function console(string $param = ''): string; }

Далее создаем общий родитель для всех агентов

<?php  namespace App\Infrastructure\Cli\Agents;  use App\Infrastructure\Contracts\Cli\Agents\AgentInterface;  class BaseAgent implements AgentInterface {      /**      * @inheritDoc      */     public static function execute(string $param = ''): string     {         return static::execute($param);     }      /**      * @inheritDoc      */     public static function console(string $param = ''): string     {         $return = static::execute($param);         echo self::AIRFLOW_DONE; // Тут добавляем вывод, если вызов прошел успешно         return $return;     } }

Ну и пример самого агента

<?php  namespace App\Infrastructure\Cli\Agents;  use App\Infrastructure\Contracts\Cli\Agents\AgentInterface; use App\Infrastructure\DTO\Cli\Agents\MethodDTO; use App\Infrastructure\Schemas\Logger\Critical\CatchSchema; use Bitrix\Main\Loader; use Bitrix\Main\LoaderException; use Exception;  class TestImportAgent extends BaseAgent implements AgentInterface {     /**      * @param string $time      * @return string      * @throws Exception      */     public static function execute(string $param = ''): string     {         try {             if (Loader::includeModule("...")) {                 // Сам запрос                 (new TestImport())->moveToStage();             }         } catch (LoaderException|Exception $e) {             LoggerFacade::elk()->critical('Move import error', (new CatchSchema($e))->toArray());         }          return 'It`s done, dude!';     } }

Что имеем: когда мы вызываем из агентов в админ-панели запись TestImportAgent::execute(), то нам выводится сообщение ‘It`s done, dude!’
а если мы вызываем из терминала
TestImportAgent::console(), то нам возвращается
‘It`s done, dude!
DONE FOR AIRFLOW’
и это важно, чтобы AIRFLOW мог понять, что все выполнено корректно.

Консоль: Битрикс внедрили себе красивенькую, рабочую консольку, и можно было бы даже с ней поиграться, если бы она не была жестко завязана на модули, и не стояла устаревшая версия symfony/console

Для того, чтобы нам иметь свою красивую консольку, установим ее отдельно

Добавим local/console.php

<?php $_SERVER["DOCUMENT_ROOT"] = realpath(dirname(__FILE__) . "/../../../.."); require_once($_SERVER["DOCUMENT_ROOT"] . '/bitrix/modules/main/cli/bootstrap.php'); require_once($_SERVER["DOCUMENT_ROOT"] . '/bitrix/vendor/autoload.php');  use Symfony\Component\Console\Application;  $obApplication = new Application();  $iterator = new RecursiveIteratorIterator(new RecursiveDirectoryIterator($_SERVER["DOCUMENT_ROOT"] . '/local/')); $regex = new RegexIterator($iterator, '/^.+Command\.php$/i', RegexIterator::GET_MATCH);  function my_autoloader($class): void {     include $class; }  foreach ($regex as $file => $value) {     my_autoloader($file); }  spl_autoload_register('my_autoloader');  foreach (get_declared_classes() as $item) {     if (is_subclass_of($item, '\Symfony\Component\Console\Command\Command')) {         $obApplication->add(new $item);     } }  try {     $obApplication->run(); } catch (Exception $e) { }

Что мы тут делаем:
1 Итеративно проходим по всем директориям и ищем файлы, которые заканчиваются на *Command.php

2 Берем найденные файлы, и если, классы этих файлов наследуют '\Symfony\Component\Console\Command\Command', то регистрируем их в симфони консоли.

3 Далее файлы с маской Command можно использовать также как в обычном symfony/console

Ну и пишем команду, которая находит, по аналогии, все файлы с маской *Agent.php, наследующие базовый аент, и выполняет <AgentName>Agent::console($params);

<?php  namespace App\Infrastructure\Cli\Airflow;  use App\Shared\Services\Management\ManagementService; use Bitrix\Main\Loader; use Bitrix\Main\LoaderException; use Bitrix\Main\Localization\Loc; use Exception; use Symfony\Component\Console; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface;  #[AsCommand(name: 'agents:run')] class RunAgentsCommand extends Console\Command\Command {     /**      * Configures the current command.      * @return void      */     protected function configure(): void     {         $this             // the command description shown when running "php bin/console list"             ->setDescription('Запустить агент"')             // the command help shown when running the command with the "--help" option             ->setHelp('Запустить агент по имени класса')             ->addArgument('className', InputArgument::OPTIONAL, 'Имя класса агента' )         ;     }      /**      * Executes the current command.      * @param InputInterface $input Console input steam.      * @param OutputInterface $output Console output steam.      * @return int      */     protected function execute(         Console\Input\InputInterface $input,         Console\Output\OutputInterface $output     ): int     {         try {             if (!Loader::includeModule(moduleName: "...")) {                 throw new \RuntimeException(message:                     Loc::getMessage(code: '..._CRM_MODULE_NOT_INSTALLED')                 );             }              $className = $input->getArgument('className');           // тут вынес сканирование по маске в отдельный метод             $arItems = ManagementService::create()->filesystem()->modules()->psr()->actions()->registerSplByMask('Agent');              foreach ($arItems as $item) {                 if (is_subclass_of($item, 'App\Infrastructure\Cli\Agents\BaseAgent')) {                     if(strstr($item, $className)){                         $findClass = $item;                     }                 }             }             if(!empty($findClass)) {               // тут идет вызов метода                 (new $findClass)->console();             } else {                 throw new \InvalidArgumentException($className . ' is not exists');             }          } catch (LoaderException|Exception $e) {             $output->writeln($e->getMessage());             return Command::FAILURE;         }          return Command::SUCCESS;     } }

Теперь нам доступен вызов любого агента, с помощью консоли

cd ./local && php ./console.php agents:run TestImportAgent

Вывод в эйрфлоу

Устанавливаем систему

Знакомство с Apache Airflow: установка и запуск первого DAGа
Привет! Меня зовут Алексей Карпов, я прикладной администратор (MLOps) отдела сопровождения моделей м…

habr.com

Далее мы просто пишем даг для эйрфлоу

"""Запустить агент тестовый""" import datetime import pendulum from airflow import DAG from functions import create_task_ssh_command, dag_success_callback, container_root, php_command  agent_name = "TestAgent"  dag = DAG(     dag_id=agent_name,     schedule="0 */1 * * *",     start_date=pendulum.datetime(2024, 11, 15, tz="Europe/Moscow"),     catchup=False,     doc_md=__doc__,     max_active_runs=1,     dagrun_timeout=datetime.timedelta(minutes=6),     on_success_callback=dag_success_callback,     tags=[ 'agent', 'test'], )  test_dag = create_task_ssh_command(dag, agent_name, f""" cd /path/ && docker compose exec -u user_name php sh -c \"{php_command} -f {container_root}/local/console.php agents:run {agent_name} 2>&1 | tee /var/log/airflow/{agent_name}.log\" """)  if __name__ == "__main__":     dag.run() 
Скрытый текст

Ну и в functions.py пишем обработчик для результатов выполнения

from __future__ import annotations from airflow.exceptions import AirflowFailException  from pprint import pprint import logging import datetime from airflow.providers.ssh.hooks.ssh import SSHHook from airflow.operators.python import PythonOperator from airflow.models import Variable  tz = datetime.timezone(datetime.timedelta(hours=3)) container_root = Variable.get("container_root") php_command = Variable.get("php_command")  sshHook = SSHHook(ssh_conn_id='alconcrm', banner_timeout=60.0) output = '' DONE_STRING = 'DONE FOR AIRFLOW' FLAG_IS_EMPTY = 'EMPTY' FLAG_IS_DONE = 'DONE' FLAG_IS_NOT_EMPTY = 'NOT_EMPTY'  def create_task_ssh_command(dag, task_id, task_command, task_success_callback=None):     return PythonOperator(         task_id=task_id,         op_kwargs={"command": task_command},         python_callable=task_method,         # on_success_callback=task_success_callback,         dag=dag     )   def task_method(**kwargs):     run_ssh_command(kwargs['command'])  def dag_success_callback(context):     # logging.info("pprint(vars(context)) dag_success_callback")     # logging.info(pprint(vars(context))) #     send_to_telegram(f""" #         \ud83d\ude00 Задача выполнена успешно. #         ID группы задач: {context.get('dag').safe_dag_id} #         Название группы задач: {context.get('dag').doc_md} #         Время запуска: {context.get('dag_run').start_date.astimezone(tz)} #         Время завершения: {context.get('dag_run').end_date.astimezone(tz)} #         Тип запуска: {context.get('dag_run').run_type} #         Время исполнения: {context.get('dag_run').end_date - context.get('dag_run').start_date} #     """) return True  def run_ssh_command(command):     ssh_client = None     try:         ssh_client = sshHook.get_conn()         ssh_client.load_system_host_keys()         stdin, stdout, stderr = ssh_client.exec_command(command)         return_result = stdout.read().decode('utf-8')         logging.info(pprint(return_result))         check_output(return_result)         return return_result     finally:         if ssh_client:             ssh_client.close()   def check_output(return_result_arg):     flag = check_done_output(return_result_arg)     if flag == FLAG_IS_NOT_EMPTY:         raise AirflowFailException('Неожиданный ответ команды в терминале: '+"\n"+(output))     elif flag == FLAG_IS_EMPTY:         raise AirflowFailException('Пустой ответ команды в терминале')   def check_line(arg_line):     return arg_line.strip() == DONE_STRING   def check_done_output(arg_output):     is_empty = True     global output     for line in arg_output.splitlines():         str = line.strip();         if check_line(str):             return FLAG_IS_DONE         elif str != '':             is_empty = False             output += str + "\n"     flag = FLAG_IS_EMPTY if is_empty else FLAG_IS_NOT_EMPTY     return flag 

И наслаждаемся полученной картинкой


ссылка на оригинал статьи https://habr.com/ru/articles/868042/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *