Разработка собственной простой системы управления задачами по расписанию на Python

от автора

Начало

Давненько я ничего не публиковал на Хабре — пора это исправлять.

В этот раз хочу поделиться темой, которая кажется простой, но на деле вызывает интерес у многих разработчиков и системных администраторов: как создать свою легковесную систему планирования задач на Python. Что-то вроде мини-аналога cron, но под свои задачи и со своими фишками.

Ведь часто бывает так: хочется, чтобы какие-то проверки или скрипты запускались в определённое время — например, в обеденный перерыв у сотрудников можно поставить автоматическую проверку всех машин на наличие вредоносного ПО. Или наоборот — распределить рутинные проверки так, чтобы они не мешали основной работе.

Сегодня я расскажу:

  • 1.Этап: Теория

  • 2.Этап: Теоретическая разработка такой программы

  • 3.Этап: Техническая разработка ПО

  • 4.Этап: Итоги

Итак, поехали!

1.Этап: Теория

Что такое планирование задач и зачем оно нужно?

В самой основе планирование задач – это механизм, который позволяет запустить функцию, скрипт или команду не прямо сейчас, а позже, основываясь на некотором правиле:

  • В определенное время (например, «каждый день в 3:00 ночи»).

  • Через определенный интервал (например, «каждые 5 минут»).

  • В определенную дату и время (например, «31 декабря 2023 года в 23:59»).

  • При наступлении определенного события (хотя это уже более продвинутые планировщики).

Зачем это нужно?

  • Автоматизация: Избавление от рутинных, повторяющихся действий.

  • Выполнение задач вне рабочего времени: Запуск ресурсоемких процессов (обработка данных, отчеты) ночью, когда нагрузка на систему минимальна.

  • Периодические проверки: Мониторинг состояния, проверка обновлений.

  • Отложенное выполнение: Запуск задач, которые зависят от готовности данных или наступления определенного момента.

Теория под капотом: Как работают планировщики?

Представим, что у нас есть список задач, каждая из которых должна выполниться по своему расписанию. Как планировщик решает, когда и что запускать? Существует несколько базовых подходов:

  1. Polling (Опрос): Самый простой метод. Планировщик постоянно (или через короткие промежутки времени) проверяет список всех задач. Для каждой задачи он смотрит: «Пришло ли время ее выполнять?». Если да, он запускает задачу.

    • Плюсы: Простота реализации.

    • Минусы: Неэффективность. Если задач много или интервал проверки слишком частый, это может потреблять много CPU. Если интервал проверки слишком редкий, задачи могут запускаться с опозданием. Также может быть проблема с «busy-waiting» (активное ожидание), если используется простой цикл while True.

  2. Event-Driven (На основе событий/таймеров): Более изящный подход. Вместо постоянного опроса, планировщик вычисляет время до следующей запланированной задачи. Затем он «засыпает» (блокируется) на это время. Когда таймер срабатывает, планировщик просыпается, выполняет задачу и вычисляет время до следующей задачи (возможно, это та же задача, если она периодическая, или другая).

    • Плюсы: Эффективность. Планировщик не тратит ресурсы впустую, когда нет задач для выполнения. Более точное планирование.

    • Минусы: Чуть сложнее в реализации, нужно правильно управлять таймерами.

  3. Использование системных планировщиков: Делегирование работы ОС (cron, systemd timers, Windows Task Scheduler). В этом случае ваше приложение или скрипт просто запускается самой операционной системой по расписанию.

    • Плюсы: Надежность (системные планировщики отказоустойчивы), не нужно писать свою логику расписаний.

    • Минусы: Меньше гибкости (сложно передавать контекст выполнения, обмениваться данными между запусками), нет централизованного управления из вашего приложения.

Выполнение задач:
Что происходит, когда приходит время запустить задачу?

  • Последовательное выполнение: Планировщик запускает задачу и ждет ее завершения, прежде чем перейти к проверке или запуску следующих задач.

    • Проблема: Если одна задача зависнет или выполняется долго, это заблокирует выполнение всех остальных задач.

  • Параллельное выполнение (в потоках или процессах): Планировщик запускает задачу в отдельном потоке (thread) или процессе. Сам планировщик при этом продолжает работать, проверяя и запуская другие задачи.

    • Плюсы: Долго выполняющиеся задачи не блокируют планировщик и другие задачи.

    • Минусы: Нужно управлять параллелизмом, избегать состояния гонки, следить за потреблением ресурсов (слишком много потоков/процессов могут нагрузить систему).

Для нашего простого планировщика на Python мы выберем комбинацию: polling (простой цикл проверки) для определения, когда запускать задачи, и параллельное выполнение в потоках (threading), чтобы сами задачи не блокировали главный цикл планировщика. Это хороший баланс между простотой и практичностью для базовых нужд.
Лично по своему опыту скажу: когда сталкиваешься с подобной теорией впервые, всё кажется сложным и запутанным. Но стоит один раз вникнуть в базовые принципы, и перед вами открывается огромный простор — можно строить целые центры автоматизации процессов.

2.Этап: Алгоритм нашей ПО

Наш планировщик будет уметь:

  • Добавлять задачи.

  • Запускать задачи по двум типам расписания:

    • С заданной периодичностью (интервал в секундах).

    • Один раз в определенное время.

  • Выполнять задачи в отдельных потоках.

  • Работать в одном процессе.

Он НЕ будет уметь (что отличает его от «взрослых» решений):

  • Сохранять список задач между перезапусками (нет персистентности).

  • Иметь гибкий синтаксис расписаний (как cron).

  • Обрабатывать ошибки выполнения задач.

  • Распределяться на несколько машин.

  • Учитывать часовые пояса.

Это будет именно простой планировщик для понимания принципов и выполнения базовых локальных задач.

Как же выглядит его алгоритм?

Алгоритм данной программы

Алгоритм данной программы

3.Этап: Техническая разработка ПО

Для создания нашего планировщика задач мы будем использовать только стандартные средства Python, без сторонних зависимостей. Это делает проект максимально лёгким и переносимым.

Нам понадобятся:

  • Python (подойдёт версия 3.8 и выше).

  • Библиотеки стандартной библиотеки Python:

    • datetime

    • uuid

    • threading

    • time

Давайте чуть подробнее рассмотрим две ключевые библиотеки, которые сыграют важную роль в работе нашего планировщика:

datetime

Модуль datetime — это стандартный инструмент для работы с датой и временем в Python.
Он позволяет:

  • получать текущую дату и время (datetime.datetime.now()),

  • выполнять арифметику дат (например, прибавлять интервалы),

  • сравнивать моменты времени,

  • форматировать дату и время в разные строки.

Почему нам важен datetime?
Мы будем с его помощью определять, когда пора запускать задачи — сравнивая текущее время с временем, назначенным для задачи.

uuid

Модуль uuid позволяет генерировать уникальные идентификаторы.
UUID расшифровывается как Universally Unique Identifier.

Почему он нужен в нашем проекте?
Каждой задаче мы будем присваивать уникальный ID. Это удобно для:

  • внутренней идентификации задач,

  • безопасного удаления или поиска конкретной задачи,

  • предотвращения конфликтов между задачами.

Мы будем использовать uuid.uuid4(), чтобы создавать случайные уникальные идентификаторы.
Теперь, когда все подготовительные шаги пройдены, можно переходить к написанию первой версии планировщика.

import threading import time from datetime import datetime import uuid  class Task:     """Класс для представления задачи"""     def __init__(self, name, function, execution_time=None, interval=None):         self.id = str(uuid.uuid4())         self.name = name         self.function = function         self.execution_time = execution_time         self.interval = interval         self.is_running = False         self.is_completed = False         self.thread = None      def should_run(self):         """Проверяет, должна ли задача быть запущена"""         if self.is_completed and not self.interval:             return False                      current_time = datetime.now()                  if self.execution_time and current_time >= self.execution_time:             if self.interval:                 # Обновляем время следующего запуска для периодических задач                 while current_time >= self.execution_time:                     self.execution_time = datetime.fromtimestamp(                         self.execution_time.timestamp() + self.interval                     )             return True         return False      def run(self):         """Запускает задачу в отдельном потоке"""         if self.is_running:             return                      self.is_running = True         self.thread = threading.Thread(target=self._execute)         self.thread.daemon = True         self.thread.start()      def _execute(self):         """Выполняет функцию задачи"""         try:             print(f"[{datetime.now()}] Выполняется задача: {self.name}")             self.function()             print(f"[{datetime.now()}] Задача {self.name} завершена")         except Exception as e:             print(f"[{datetime.now()}] Ошибка при выполнении задачи {self.name}: {e}")         finally:             self.is_running = False             if not self.interval:                 self.is_completed = True   class TaskScheduler:     """Планировщик задач"""     def __init__(self, polling_interval=1):         """         Инициализация планировщика         :param polling_interval: Интервал проверки задач в секундах         """         self.tasks = []         self.polling_interval = polling_interval         self.is_running = False         self.scheduler_thread = None      def add_task(self, task):         """Добавляет задачу в список задач"""         self.tasks.append(task)         print(f"Задача '{task.name}' добавлена в планировщик")         return task.id      def schedule_task(self, name, function, execution_time=None, interval=None):         """         Создает и добавляет новую задачу         :param name: Название задачи         :param function: Функция для выполнения         :param execution_time: Время выполнения (datetime объект)         :param interval: Интервал повторения в секундах         :return: ID задачи         """         task = Task(name, function, execution_time, interval)         return self.add_task(task)      def remove_task(self, task_id):         """Удаляет задачу по ID"""         for i, task in enumerate(self.tasks):             if task.id == task_id:                 self.tasks.pop(i)                 print(f"Задача с ID {task_id} удалена")                 return True         return False      def start(self):         """Запускает планировщик задач"""         if self.is_running:             print("Планировщик уже запущен")             return          self.is_running = True         self.scheduler_thread = threading.Thread(target=self._main_loop)         self.scheduler_thread.daemon = True         self.scheduler_thread.start()         print("Планировщик задач запущен")      def stop(self):         """Останавливает планировщик задач"""         self.is_running = False         if self.scheduler_thread:             self.scheduler_thread.join(timeout=self.polling_interval*2)         print("Планировщик задач остановлен")      def _main_loop(self):         """Основной цикл планировщика"""         while self.is_running:             # Проверяем и запускаем задачи, которые должны быть выполнены             for task in self.tasks:                 if task.should_run():                     task.run()                          # Удаляем завершенные задачи             self.tasks = [task for task in self.tasks if not task.is_completed]                          # Минимальная задержка в основном цикле             time.sleep(self.polling_interval)   # Пример использования if __name__ == "__main__":     def task1():         print("Выполняется задача 1")         time.sleep(2)  # Имитация работы              def task2():         print("Выполняется задача 2")         time.sleep(1)  # Имитация работы              def task3():         print("Выполняется периодическая задача")          # Создаем планировщик с интервалом опроса 0.5 секунды     scheduler = TaskScheduler(polling_interval=0.5)          # Добавляем задачи     # Задача, которая выполнится через 3 секунды     scheduler.schedule_task(         "Отложенная задача",          task1,          execution_time=datetime.now().replace(microsecond=0) +                         datetime.timedelta(seconds=3)     )          # Задача, которая выполнится сразу     scheduler.schedule_task(         "Мгновенная задача",          task2,          execution_time=datetime.now()     )          # Периодическая задача, которая будет выполняться каждые 5 секунд     scheduler.schedule_task(         "Периодическая задача",          task3,          execution_time=datetime.now(),          interval=5     )          # Запускаем планировщик     scheduler.start()          try:         # Даем планировщику поработать 30 секунд         time.sleep(30)     except KeyboardInterrupt:         print("Программа прервана пользователем")     finally:         # Останавливаем планировщик         scheduler.stop()         print("Программа завершена")

И так теперь смотрим что у на по итогу получилось?

Итак, наш базовый планировщик работает: задачи добавляются, запускаются по расписанию, всё хорошо.

Но возникает логичный вопрос:
Как это может помочь системным и сетевым администраторам в реальной жизни?

Ответ прост: используя планировщик, можно автоматизировать рутинные задачи, которые раньше приходилось выполнять вручную. Например:

  • Мониторинг использования ресурсов (CPU, память, диск).

  • Проверка доступности серверов и сервисов.

  • Запуск диагностических скриптов или утилит.

  • Автоматическое выполнение профилактических процедур.

Чтобы добавить такие возможности, нам понадобятся ещё две мощные библиотеки:

psutil

psutil (process and system utilities) — сторонняя библиотека для Python, которая позволяет получать информацию о состоянии системы:

  • загрузка процессора,

  • использование оперативной памяти,

  • статистика по дискам и сетевым интерфейсам,

  • процессы, запущенные в системе.

Почему она важна?
С её помощью можно написать задачи, которые будут следить за состоянием машины и вовремя предупреждать о проблемах.

subprocess

subprocess — стандартный модуль Python для работы с внешними процессами.

Позволяет:

  • запускать системные команды,

  • взаимодействовать с выводом консоли,

  • обрабатывать ошибки при запуске.

Почему это важно?
С помощью subprocess можно, например:

  • запустить скрипт антивирусной проверки,

  • проверить доступность сервера через ping,

  • перезапустить службы,

  • выполнить резервное копирование через системные утилиты.

Теперь давайте усложним наш планировщик и добавим несколько реальных примеров использования этих библиотек.

import threading import time from datetime import datetime, timedelta import uuid import psutil  # Для мониторинга системных ресурсов import subprocess  # Для выполнения системных команд  class Task:     """Класс для представления задачи"""     def __init__(self, name, function, execution_time=None, interval=None):         self.id = str(uuid.uuid4())         self.name = name         self.function = function         self.execution_time = execution_time         self.interval = interval         self.is_running = False         self.is_completed = False         self.thread = None      def should_run(self):         """Проверяет, должна ли задача быть запущена"""         if self.is_completed and not self.interval:             return False                      current_time = datetime.now()                  if self.execution_time and current_time >= self.execution_time:             if self.interval:                 # Обновляем время следующего запуска для периодических задач                 while current_time >= self.execution_time:                     self.execution_time = datetime.fromtimestamp(                         self.execution_time.timestamp() + self.interval                     )             return True         return False      def run(self):         """Запускает задачу в отдельном потоке"""         if self.is_running:             return                      self.is_running = True         self.thread = threading.Thread(target=self._execute)         self.thread.daemon = True         self.thread.start()      def _execute(self):         """Выполняет функцию задачи"""         try:             print(f"[{datetime.now()}] Выполняется задача: {self.name}")             self.function()             print(f"[{datetime.now()}] Задача {self.name} завершена")         except Exception as e:             print(f"[{datetime.now()}] Ошибка при выполнении задачи {self.name}: {e}")         finally:             self.is_running = False             if not self.interval:                 self.is_completed = True   class TaskScheduler:     """Планировщик задач"""     def __init__(self, polling_interval=1):         """         Инициализация планировщика         :param polling_interval: Интервал проверки задач в секундах         """         self.tasks = []         self.polling_interval = polling_interval         self.is_running = False         self.scheduler_thread = None      def add_task(self, task):         """Добавляет задачу в список задач"""         self.tasks.append(task)         print(f"Задача '{task.name}' добавлена в планировщик")         return task.id      def schedule_task(self, name, function, execution_time=None, interval=None):         """         Создает и добавляет новую задачу         :param name: Название задачи         :param function: Функция для выполнения         :param execution_time: Время выполнения (datetime объект)         :param interval: Интервал повторения в секундах         :return: ID задачи         """         task = Task(name, function, execution_time, interval)         return self.add_task(task)      def remove_task(self, task_id):         """Удаляет задачу по ID"""         for i, task in enumerate(self.tasks):             if task.id == task_id:                 self.tasks.pop(i)                 print(f"Задача с ID {task_id} удалена")                 return True         return False      def start(self):         """Запускает планировщик задач"""         if self.is_running:             print("Планировщик уже запущен")             return          self.is_running = True         self.scheduler_thread = threading.Thread(target=self._main_loop)         self.scheduler_thread.daemon = True         self.scheduler_thread.start()         print("Планировщик задач запущен")      def stop(self):         """Останавливает планировщик задач"""         self.is_running = False         if self.scheduler_thread:             self.scheduler_thread.join(timeout=self.polling_interval*2)         print("Планировщик задач остановлен")      def _main_loop(self):         """Основной цикл планировщика"""         while self.is_running:             # Проверяем и запускаем задачи, которые должны быть выполнены             for task in self.tasks:                 if task.should_run():                     task.run()                          # Удаляем завершенные задачи             self.tasks = [task for task in self.tasks if not task.is_completed]                          # Минимальная задержка в основном цикле             time.sleep(self.polling_interval)   # Функции для мониторинга системы def monitor_cpu_usage():     """Мониторинг загрузки ЦП компьютера"""     cpu_percent = psutil.cpu_percent(interval=1)     print(f"Текущая загрузка ЦП: {cpu_percent}%")          # Дополнительная информация о ЦП     cpu_count = psutil.cpu_count(logical=False)     cpu_count_logical = psutil.cpu_count(logical=True)     print(f"Количество физических ядер ЦП: {cpu_count}")     print(f"Количество логических ядер ЦП: {cpu_count_logical}")          # Информация о загрузке каждого ядра     per_cpu = psutil.cpu_percent(interval=1, percpu=True)     for i, percent in enumerate(per_cpu):         print(f"Загрузка ядра {i}: {percent}%")  def monitor_network_packets():     """Мониторинг сетевых пакетов пользователя"""     try:         # Получаем информацию о сетевых интерфейсах         net_io = psutil.net_io_counters(pernic=True)                  print("Информация о сетевых пакетах:")         for interface, stats in net_io.items():             print(f"\nИнтерфейс: {interface}")             print(f"Отправлено байт: {stats.bytes_sent}")             print(f"Получено байт: {stats.bytes_recv}")             print(f"Отправлено пакетов: {stats.packets_sent}")             print(f"Получено пакетов: {stats.packets_recv}")             print(f"Ошибки при отправке: {stats.errin}")             print(f"Ошибки при получении: {stats.errout}")             print(f"Пакеты отброшены при отправке: {stats.dropin}")             print(f"Пакеты отброшены при получении: {stats.dropout}")                  # Дополнительно можно использовать netstat для Windows         print("\nАктивные сетевые соединения:")         result = subprocess.run(["netstat", "-n"], capture_output=True, text=True)         connections = result.stdout.split('\n')         # Выводим только первые 10 строк, чтобы не перегружать вывод         for line in connections[:10]:             print(line)                  except Exception as e:         print(f"Ошибка при мониторинге сетевых пакетов: {e}")  # Пример использования if __name__ == "__main__":     # Создаем планировщик с интервалом опроса 0.5 секунды     scheduler = TaskScheduler(polling_interval=0.5)          # Добавляем задачи     # Задача 1: Мониторинг загрузки ЦП (каждые 5 секунд)     scheduler.schedule_task(         "Мониторинг загрузки ЦП",          monitor_cpu_usage,          execution_time=datetime.now(),          interval=5     )          # Задача 2: Мониторинг сетевых пакетов (каждые 10 секунд)     scheduler.schedule_task(         "Мониторинг сетевых пакетов",          monitor_network_packets,          execution_time=datetime.now() + timedelta(seconds=2),          interval=10     )          # Запускаем планировщик     scheduler.start()          try:         # Даем планировщику поработать 60 секунд         print("Планировщик будет работать 60 секунд. Нажмите Ctrl+C для досрочного завершения.")         time.sleep(60)     except KeyboardInterrupt:         print("Программа прервана пользователем")     finally:         # Останавливаем планировщик         scheduler.stop()         print("Программа завершена")

и что мы получаем в ответ

4.Этап: Итоги

Что можно сделать на основе такого простого планировщика задач?
Да практически всё, что угодно!

  • Реализовать удалённую доставку задач для корпоративных компьютеров.

  • Построить центры автоматизации для системных администраторов.

  • Развить проект до полноценного решения для управления инфраструктурой.

  • Или даже запустить коммерческий продукт на базе этой идеи.

Код, который мы написали, — это фундамент, на котором можно строить сложные и надёжные системы. Он лёгкий, расширяемый и даёт отличную отправную точку для собственных экспериментов и разработок.

Спасибо, что дочитали статью до конца! Надеюсь, материал был для вас полезным и вдохновляющим.

Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.

Полезно ли оказалась вам статья?

33.33% Да3
66.67% Нет6

Проголосовали 9 пользователей. Воздержавшихся нет.

ссылка на оригинал статьи https://habr.com/ru/articles/904800/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *