Всем привет, случалось такое, что вам надо поставить кучу агентов битрикса на крон, а потом сидеть и разбираться — сколько они отрабатывают, отрабатывают ли вообще, когда падают или зависают?
Ну конечно случалось. Так вот, чтобы получить визуальное представление о том, что там происходит, было принято решение, вынести агенты даже не на крон, а на apache airflow. Поведаю вам, как это было реализовано.
Агенты
По факту, агенты — это выполнение php команд по расписанию. Но мы пойдем дальше и выделим их в отдельную сущность, в классы с расширением Agents.php
Создадим интерфейс для агентов
<?php namespace App\Infrastructure\Contracts\Cli\Agents; interface AgentInterface { /** * @var string Возвращаемое значение для дагов в Airflow в случае успешного исполнения. */ const AIRFLOW_DONE = 'DONE FOR AIRFLOW'; /** * Метод выполняет код на агентах * @return string Возвращает строку вызова текущего метода. */ public static function execute(string $param = ''): string; /** * Метод для запуска из консоли. * @param string $param * @return string */ public static function console(string $param = ''): string; }
Далее создаем общий родитель для всех агентов
<?php namespace App\Infrastructure\Cli\Agents; use App\Infrastructure\Contracts\Cli\Agents\AgentInterface; class BaseAgent implements AgentInterface { /** * @inheritDoc */ public static function execute(string $param = ''): string { return static::execute($param); } /** * @inheritDoc */ public static function console(string $param = ''): string { $return = static::execute($param); echo self::AIRFLOW_DONE; // Тут добавляем вывод, если вызов прошел успешно return $return; } }
Ну и пример самого агента
<?php namespace App\Infrastructure\Cli\Agents; use App\Infrastructure\Contracts\Cli\Agents\AgentInterface; use App\Infrastructure\DTO\Cli\Agents\MethodDTO; use App\Infrastructure\Schemas\Logger\Critical\CatchSchema; use Bitrix\Main\Loader; use Bitrix\Main\LoaderException; use Exception; class TestImportAgent extends BaseAgent implements AgentInterface { /** * @param string $time * @return string * @throws Exception */ public static function execute(string $param = ''): string { try { if (Loader::includeModule("...")) { // Сам запрос (new TestImport())->moveToStage(); } } catch (LoaderException|Exception $e) { LoggerFacade::elk()->critical('Move import error', (new CatchSchema($e))->toArray()); } return 'It`s done, dude!'; } }
Что имеем: когда мы вызываем из агентов в админ-панели запись TestImportAgent::execute()
, то нам выводится сообщение ‘It`s done, dude!’
а если мы вызываем из терминала TestImportAgent::console(),
то нам возвращается
‘It`s done, dude!
DONE FOR AIRFLOW’
и это важно, чтобы AIRFLOW мог понять, что все выполнено корректно.
Консоль: Битрикс внедрили себе красивенькую, рабочую консольку, и можно было бы даже с ней поиграться, если бы она не была жестко завязана на модули, и не стояла устаревшая версия symfony/console
Для того, чтобы нам иметь свою красивую консольку, установим ее отдельно
Добавим local/console.php
<?php $_SERVER["DOCUMENT_ROOT"] = realpath(dirname(__FILE__) . "/../../../.."); require_once($_SERVER["DOCUMENT_ROOT"] . '/bitrix/modules/main/cli/bootstrap.php'); require_once($_SERVER["DOCUMENT_ROOT"] . '/bitrix/vendor/autoload.php'); use Symfony\Component\Console\Application; $obApplication = new Application(); $iterator = new RecursiveIteratorIterator(new RecursiveDirectoryIterator($_SERVER["DOCUMENT_ROOT"] . '/local/')); $regex = new RegexIterator($iterator, '/^.+Command\.php$/i', RegexIterator::GET_MATCH); function my_autoloader($class): void { include $class; } foreach ($regex as $file => $value) { my_autoloader($file); } spl_autoload_register('my_autoloader'); foreach (get_declared_classes() as $item) { if (is_subclass_of($item, '\Symfony\Component\Console\Command\Command')) { $obApplication->add(new $item); } } try { $obApplication->run(); } catch (Exception $e) { }
Что мы тут делаем:
1 Итеративно проходим по всем директориям и ищем файлы, которые заканчиваются на *Command.php
2 Берем найденные файлы, и если, классы этих файлов наследуют '\Symfony\Component\Console\Command\Command'
, то регистрируем их в симфони консоли.
3 Далее файлы с маской Command можно использовать также как в обычном symfony/console
Ну и пишем команду, которая находит, по аналогии, все файлы с маской *Agent.php, наследующие базовый аент, и выполняет <AgentName>Agent::console($params);
<?php namespace App\Infrastructure\Cli\Airflow; use App\Shared\Services\Management\ManagementService; use Bitrix\Main\Loader; use Bitrix\Main\LoaderException; use Bitrix\Main\Localization\Loc; use Exception; use Symfony\Component\Console; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; #[AsCommand(name: 'agents:run')] class RunAgentsCommand extends Console\Command\Command { /** * Configures the current command. * @return void */ protected function configure(): void { $this // the command description shown when running "php bin/console list" ->setDescription('Запустить агент"') // the command help shown when running the command with the "--help" option ->setHelp('Запустить агент по имени класса') ->addArgument('className', InputArgument::OPTIONAL, 'Имя класса агента' ) ; } /** * Executes the current command. * @param InputInterface $input Console input steam. * @param OutputInterface $output Console output steam. * @return int */ protected function execute( Console\Input\InputInterface $input, Console\Output\OutputInterface $output ): int { try { if (!Loader::includeModule(moduleName: "...")) { throw new \RuntimeException(message: Loc::getMessage(code: '..._CRM_MODULE_NOT_INSTALLED') ); } $className = $input->getArgument('className'); // тут вынес сканирование по маске в отдельный метод $arItems = ManagementService::create()->filesystem()->modules()->psr()->actions()->registerSplByMask('Agent'); foreach ($arItems as $item) { if (is_subclass_of($item, 'App\Infrastructure\Cli\Agents\BaseAgent')) { if(strstr($item, $className)){ $findClass = $item; } } } if(!empty($findClass)) { // тут идет вызов метода (new $findClass)->console(); } else { throw new \InvalidArgumentException($className . ' is not exists'); } } catch (LoaderException|Exception $e) { $output->writeln($e->getMessage()); return Command::FAILURE; } return Command::SUCCESS; } }
Теперь нам доступен вызов любого агента, с помощью консоли
cd ./local && php ./console.php agents:run TestImportAgent
Вывод в эйрфлоу
Далее мы просто пишем даг для эйрфлоу
"""Запустить агент тестовый""" import datetime import pendulum from airflow import DAG from functions import create_task_ssh_command, dag_success_callback, container_root, php_command agent_name = "TestAgent" dag = DAG( dag_id=agent_name, schedule="0 */1 * * *", start_date=pendulum.datetime(2024, 11, 15, tz="Europe/Moscow"), catchup=False, doc_md=__doc__, max_active_runs=1, dagrun_timeout=datetime.timedelta(minutes=6), on_success_callback=dag_success_callback, tags=[ 'agent', 'test'], ) test_dag = create_task_ssh_command(dag, agent_name, f""" cd /path/ && docker compose exec -u user_name php sh -c \"{php_command} -f {container_root}/local/console.php agents:run {agent_name} 2>&1 | tee /var/log/airflow/{agent_name}.log\" """) if __name__ == "__main__": dag.run()
Скрытый текст
Ну и в functions.py пишем обработчик для результатов выполнения
from __future__ import annotations from airflow.exceptions import AirflowFailException from pprint import pprint import logging import datetime from airflow.providers.ssh.hooks.ssh import SSHHook from airflow.operators.python import PythonOperator from airflow.models import Variable tz = datetime.timezone(datetime.timedelta(hours=3)) container_root = Variable.get("container_root") php_command = Variable.get("php_command") sshHook = SSHHook(ssh_conn_id='alconcrm', banner_timeout=60.0) output = '' DONE_STRING = 'DONE FOR AIRFLOW' FLAG_IS_EMPTY = 'EMPTY' FLAG_IS_DONE = 'DONE' FLAG_IS_NOT_EMPTY = 'NOT_EMPTY' def create_task_ssh_command(dag, task_id, task_command, task_success_callback=None): return PythonOperator( task_id=task_id, op_kwargs={"command": task_command}, python_callable=task_method, # on_success_callback=task_success_callback, dag=dag ) def task_method(**kwargs): run_ssh_command(kwargs['command']) def dag_success_callback(context): # logging.info("pprint(vars(context)) dag_success_callback") # logging.info(pprint(vars(context))) # send_to_telegram(f""" # \ud83d\ude00 Задача выполнена успешно. # ID группы задач: {context.get('dag').safe_dag_id} # Название группы задач: {context.get('dag').doc_md} # Время запуска: {context.get('dag_run').start_date.astimezone(tz)} # Время завершения: {context.get('dag_run').end_date.astimezone(tz)} # Тип запуска: {context.get('dag_run').run_type} # Время исполнения: {context.get('dag_run').end_date - context.get('dag_run').start_date} # """) return True def run_ssh_command(command): ssh_client = None try: ssh_client = sshHook.get_conn() ssh_client.load_system_host_keys() stdin, stdout, stderr = ssh_client.exec_command(command) return_result = stdout.read().decode('utf-8') logging.info(pprint(return_result)) check_output(return_result) return return_result finally: if ssh_client: ssh_client.close() def check_output(return_result_arg): flag = check_done_output(return_result_arg) if flag == FLAG_IS_NOT_EMPTY: raise AirflowFailException('Неожиданный ответ команды в терминале: '+"\n"+(output)) elif flag == FLAG_IS_EMPTY: raise AirflowFailException('Пустой ответ команды в терминале') def check_line(arg_line): return arg_line.strip() == DONE_STRING def check_done_output(arg_output): is_empty = True global output for line in arg_output.splitlines(): str = line.strip(); if check_line(str): return FLAG_IS_DONE elif str != '': is_empty = False output += str + "\n" flag = FLAG_IS_EMPTY if is_empty else FLAG_IS_NOT_EMPTY return flag
И наслаждаемся полученной картинкой
ссылка на оригинал статьи https://habr.com/ru/articles/868042/
Добавить комментарий