Начало
Давненько я ничего не публиковал на Хабре — пора это исправлять.
В этот раз хочу поделиться темой, которая кажется простой, но на деле вызывает интерес у многих разработчиков и системных администраторов: как создать свою легковесную систему планирования задач на Python. Что-то вроде мини-аналога cron, но под свои задачи и со своими фишками.
Ведь часто бывает так: хочется, чтобы какие-то проверки или скрипты запускались в определённое время — например, в обеденный перерыв у сотрудников можно поставить автоматическую проверку всех машин на наличие вредоносного ПО. Или наоборот — распределить рутинные проверки так, чтобы они не мешали основной работе.
Сегодня я расскажу:
-
1.Этап: Теория
-
2.Этап: Теоретическая разработка такой программы
-
3.Этап: Техническая разработка ПО
-
4.Этап: Итоги
Итак, поехали!
1.Этап: Теория
Что такое планирование задач и зачем оно нужно?
В самой основе планирование задач – это механизм, который позволяет запустить функцию, скрипт или команду не прямо сейчас, а позже, основываясь на некотором правиле:
-
В определенное время (например, «каждый день в 3:00 ночи»).
-
Через определенный интервал (например, «каждые 5 минут»).
-
В определенную дату и время (например, «31 декабря 2023 года в 23:59»).
-
При наступлении определенного события (хотя это уже более продвинутые планировщики).
Зачем это нужно?
-
Автоматизация: Избавление от рутинных, повторяющихся действий.
-
Выполнение задач вне рабочего времени: Запуск ресурсоемких процессов (обработка данных, отчеты) ночью, когда нагрузка на систему минимальна.
-
Периодические проверки: Мониторинг состояния, проверка обновлений.
-
Отложенное выполнение: Запуск задач, которые зависят от готовности данных или наступления определенного момента.
Теория под капотом: Как работают планировщики?
Представим, что у нас есть список задач, каждая из которых должна выполниться по своему расписанию. Как планировщик решает, когда и что запускать? Существует несколько базовых подходов:
-
Polling (Опрос): Самый простой метод. Планировщик постоянно (или через короткие промежутки времени) проверяет список всех задач. Для каждой задачи он смотрит: «Пришло ли время ее выполнять?». Если да, он запускает задачу.
-
Плюсы: Простота реализации.
-
Минусы: Неэффективность. Если задач много или интервал проверки слишком частый, это может потреблять много CPU. Если интервал проверки слишком редкий, задачи могут запускаться с опозданием. Также может быть проблема с «busy-waiting» (активное ожидание), если используется простой цикл while True.
-
-
Event-Driven (На основе событий/таймеров): Более изящный подход. Вместо постоянного опроса, планировщик вычисляет время до следующей запланированной задачи. Затем он «засыпает» (блокируется) на это время. Когда таймер срабатывает, планировщик просыпается, выполняет задачу и вычисляет время до следующей задачи (возможно, это та же задача, если она периодическая, или другая).
-
Плюсы: Эффективность. Планировщик не тратит ресурсы впустую, когда нет задач для выполнения. Более точное планирование.
-
Минусы: Чуть сложнее в реализации, нужно правильно управлять таймерами.
-
-
Использование системных планировщиков: Делегирование работы ОС (cron, systemd timers, Windows Task Scheduler). В этом случае ваше приложение или скрипт просто запускается самой операционной системой по расписанию.
-
Плюсы: Надежность (системные планировщики отказоустойчивы), не нужно писать свою логику расписаний.
-
Минусы: Меньше гибкости (сложно передавать контекст выполнения, обмениваться данными между запусками), нет централизованного управления из вашего приложения.
-
Выполнение задач:
Что происходит, когда приходит время запустить задачу?
-
Последовательное выполнение: Планировщик запускает задачу и ждет ее завершения, прежде чем перейти к проверке или запуску следующих задач.
-
Проблема: Если одна задача зависнет или выполняется долго, это заблокирует выполнение всех остальных задач.
-
-
Параллельное выполнение (в потоках или процессах): Планировщик запускает задачу в отдельном потоке (thread) или процессе. Сам планировщик при этом продолжает работать, проверяя и запуская другие задачи.
-
Плюсы: Долго выполняющиеся задачи не блокируют планировщик и другие задачи.
-
Минусы: Нужно управлять параллелизмом, избегать состояния гонки, следить за потреблением ресурсов (слишком много потоков/процессов могут нагрузить систему).
-
Для нашего простого планировщика на Python мы выберем комбинацию: polling (простой цикл проверки) для определения, когда запускать задачи, и параллельное выполнение в потоках (threading), чтобы сами задачи не блокировали главный цикл планировщика. Это хороший баланс между простотой и практичностью для базовых нужд.
Лично по своему опыту скажу: когда сталкиваешься с подобной теорией впервые, всё кажется сложным и запутанным. Но стоит один раз вникнуть в базовые принципы, и перед вами открывается огромный простор — можно строить целые центры автоматизации процессов.
2.Этап: Алгоритм нашей ПО
Наш планировщик будет уметь:
-
Добавлять задачи.
-
Запускать задачи по двум типам расписания:
-
С заданной периодичностью (интервал в секундах).
-
Один раз в определенное время.
-
-
Выполнять задачи в отдельных потоках.
-
Работать в одном процессе.
Он НЕ будет уметь (что отличает его от «взрослых» решений):
-
Сохранять список задач между перезапусками (нет персистентности).
-
Иметь гибкий синтаксис расписаний (как cron).
-
Обрабатывать ошибки выполнения задач.
-
Распределяться на несколько машин.
-
Учитывать часовые пояса.
Это будет именно простой планировщик для понимания принципов и выполнения базовых локальных задач.
Как же выглядит его алгоритм?
3.Этап: Техническая разработка ПО
Для создания нашего планировщика задач мы будем использовать только стандартные средства Python, без сторонних зависимостей. Это делает проект максимально лёгким и переносимым.
Нам понадобятся:
-
Python (подойдёт версия 3.8 и выше).
-
Библиотеки стандартной библиотеки Python:
-
datetime -
uuid -
threading -
time
-
Давайте чуть подробнее рассмотрим две ключевые библиотеки, которые сыграют важную роль в работе нашего планировщика:
datetime
Модуль datetime — это стандартный инструмент для работы с датой и временем в Python.
Он позволяет:
-
получать текущую дату и время (
datetime.datetime.now()), -
выполнять арифметику дат (например, прибавлять интервалы),
-
сравнивать моменты времени,
-
форматировать дату и время в разные строки.
Почему нам важен datetime?
Мы будем с его помощью определять, когда пора запускать задачи — сравнивая текущее время с временем, назначенным для задачи.
uuid
Модуль uuid позволяет генерировать уникальные идентификаторы.
UUID расшифровывается как Universally Unique Identifier.
Почему он нужен в нашем проекте?
Каждой задаче мы будем присваивать уникальный ID. Это удобно для:
-
внутренней идентификации задач,
-
безопасного удаления или поиска конкретной задачи,
-
предотвращения конфликтов между задачами.
Мы будем использовать uuid.uuid4(), чтобы создавать случайные уникальные идентификаторы.
Теперь, когда все подготовительные шаги пройдены, можно переходить к написанию первой версии планировщика.
import threading import time from datetime import datetime import uuid class Task: """Класс для представления задачи""" def __init__(self, name, function, execution_time=None, interval=None): self.id = str(uuid.uuid4()) self.name = name self.function = function self.execution_time = execution_time self.interval = interval self.is_running = False self.is_completed = False self.thread = None def should_run(self): """Проверяет, должна ли задача быть запущена""" if self.is_completed and not self.interval: return False current_time = datetime.now() if self.execution_time and current_time >= self.execution_time: if self.interval: # Обновляем время следующего запуска для периодических задач while current_time >= self.execution_time: self.execution_time = datetime.fromtimestamp( self.execution_time.timestamp() + self.interval ) return True return False def run(self): """Запускает задачу в отдельном потоке""" if self.is_running: return self.is_running = True self.thread = threading.Thread(target=self._execute) self.thread.daemon = True self.thread.start() def _execute(self): """Выполняет функцию задачи""" try: print(f"[{datetime.now()}] Выполняется задача: {self.name}") self.function() print(f"[{datetime.now()}] Задача {self.name} завершена") except Exception as e: print(f"[{datetime.now()}] Ошибка при выполнении задачи {self.name}: {e}") finally: self.is_running = False if not self.interval: self.is_completed = True class TaskScheduler: """Планировщик задач""" def __init__(self, polling_interval=1): """ Инициализация планировщика :param polling_interval: Интервал проверки задач в секундах """ self.tasks = [] self.polling_interval = polling_interval self.is_running = False self.scheduler_thread = None def add_task(self, task): """Добавляет задачу в список задач""" self.tasks.append(task) print(f"Задача '{task.name}' добавлена в планировщик") return task.id def schedule_task(self, name, function, execution_time=None, interval=None): """ Создает и добавляет новую задачу :param name: Название задачи :param function: Функция для выполнения :param execution_time: Время выполнения (datetime объект) :param interval: Интервал повторения в секундах :return: ID задачи """ task = Task(name, function, execution_time, interval) return self.add_task(task) def remove_task(self, task_id): """Удаляет задачу по ID""" for i, task in enumerate(self.tasks): if task.id == task_id: self.tasks.pop(i) print(f"Задача с ID {task_id} удалена") return True return False def start(self): """Запускает планировщик задач""" if self.is_running: print("Планировщик уже запущен") return self.is_running = True self.scheduler_thread = threading.Thread(target=self._main_loop) self.scheduler_thread.daemon = True self.scheduler_thread.start() print("Планировщик задач запущен") def stop(self): """Останавливает планировщик задач""" self.is_running = False if self.scheduler_thread: self.scheduler_thread.join(timeout=self.polling_interval*2) print("Планировщик задач остановлен") def _main_loop(self): """Основной цикл планировщика""" while self.is_running: # Проверяем и запускаем задачи, которые должны быть выполнены for task in self.tasks: if task.should_run(): task.run() # Удаляем завершенные задачи self.tasks = [task for task in self.tasks if not task.is_completed] # Минимальная задержка в основном цикле time.sleep(self.polling_interval) # Пример использования if __name__ == "__main__": def task1(): print("Выполняется задача 1") time.sleep(2) # Имитация работы def task2(): print("Выполняется задача 2") time.sleep(1) # Имитация работы def task3(): print("Выполняется периодическая задача") # Создаем планировщик с интервалом опроса 0.5 секунды scheduler = TaskScheduler(polling_interval=0.5) # Добавляем задачи # Задача, которая выполнится через 3 секунды scheduler.schedule_task( "Отложенная задача", task1, execution_time=datetime.now().replace(microsecond=0) + datetime.timedelta(seconds=3) ) # Задача, которая выполнится сразу scheduler.schedule_task( "Мгновенная задача", task2, execution_time=datetime.now() ) # Периодическая задача, которая будет выполняться каждые 5 секунд scheduler.schedule_task( "Периодическая задача", task3, execution_time=datetime.now(), interval=5 ) # Запускаем планировщик scheduler.start() try: # Даем планировщику поработать 30 секунд time.sleep(30) except KeyboardInterrupt: print("Программа прервана пользователем") finally: # Останавливаем планировщик scheduler.stop() print("Программа завершена")
И так теперь смотрим что у на по итогу получилось?

Итак, наш базовый планировщик работает: задачи добавляются, запускаются по расписанию, всё хорошо.
Но возникает логичный вопрос:
Как это может помочь системным и сетевым администраторам в реальной жизни?
Ответ прост: используя планировщик, можно автоматизировать рутинные задачи, которые раньше приходилось выполнять вручную. Например:
-
Мониторинг использования ресурсов (CPU, память, диск).
-
Проверка доступности серверов и сервисов.
-
Запуск диагностических скриптов или утилит.
-
Автоматическое выполнение профилактических процедур.
Чтобы добавить такие возможности, нам понадобятся ещё две мощные библиотеки:
psutil
psutil (process and system utilities) — сторонняя библиотека для Python, которая позволяет получать информацию о состоянии системы:
-
загрузка процессора,
-
использование оперативной памяти,
-
статистика по дискам и сетевым интерфейсам,
-
процессы, запущенные в системе.
Почему она важна?
С её помощью можно написать задачи, которые будут следить за состоянием машины и вовремя предупреждать о проблемах.
subprocess
subprocess — стандартный модуль Python для работы с внешними процессами.
Позволяет:
-
запускать системные команды,
-
взаимодействовать с выводом консоли,
-
обрабатывать ошибки при запуске.
Почему это важно?
С помощью subprocess можно, например:
-
запустить скрипт антивирусной проверки,
-
проверить доступность сервера через
ping, -
перезапустить службы,
-
выполнить резервное копирование через системные утилиты.
Теперь давайте усложним наш планировщик и добавим несколько реальных примеров использования этих библиотек.
import threading import time from datetime import datetime, timedelta import uuid import psutil # Для мониторинга системных ресурсов import subprocess # Для выполнения системных команд class Task: """Класс для представления задачи""" def __init__(self, name, function, execution_time=None, interval=None): self.id = str(uuid.uuid4()) self.name = name self.function = function self.execution_time = execution_time self.interval = interval self.is_running = False self.is_completed = False self.thread = None def should_run(self): """Проверяет, должна ли задача быть запущена""" if self.is_completed and not self.interval: return False current_time = datetime.now() if self.execution_time and current_time >= self.execution_time: if self.interval: # Обновляем время следующего запуска для периодических задач while current_time >= self.execution_time: self.execution_time = datetime.fromtimestamp( self.execution_time.timestamp() + self.interval ) return True return False def run(self): """Запускает задачу в отдельном потоке""" if self.is_running: return self.is_running = True self.thread = threading.Thread(target=self._execute) self.thread.daemon = True self.thread.start() def _execute(self): """Выполняет функцию задачи""" try: print(f"[{datetime.now()}] Выполняется задача: {self.name}") self.function() print(f"[{datetime.now()}] Задача {self.name} завершена") except Exception as e: print(f"[{datetime.now()}] Ошибка при выполнении задачи {self.name}: {e}") finally: self.is_running = False if not self.interval: self.is_completed = True class TaskScheduler: """Планировщик задач""" def __init__(self, polling_interval=1): """ Инициализация планировщика :param polling_interval: Интервал проверки задач в секундах """ self.tasks = [] self.polling_interval = polling_interval self.is_running = False self.scheduler_thread = None def add_task(self, task): """Добавляет задачу в список задач""" self.tasks.append(task) print(f"Задача '{task.name}' добавлена в планировщик") return task.id def schedule_task(self, name, function, execution_time=None, interval=None): """ Создает и добавляет новую задачу :param name: Название задачи :param function: Функция для выполнения :param execution_time: Время выполнения (datetime объект) :param interval: Интервал повторения в секундах :return: ID задачи """ task = Task(name, function, execution_time, interval) return self.add_task(task) def remove_task(self, task_id): """Удаляет задачу по ID""" for i, task in enumerate(self.tasks): if task.id == task_id: self.tasks.pop(i) print(f"Задача с ID {task_id} удалена") return True return False def start(self): """Запускает планировщик задач""" if self.is_running: print("Планировщик уже запущен") return self.is_running = True self.scheduler_thread = threading.Thread(target=self._main_loop) self.scheduler_thread.daemon = True self.scheduler_thread.start() print("Планировщик задач запущен") def stop(self): """Останавливает планировщик задач""" self.is_running = False if self.scheduler_thread: self.scheduler_thread.join(timeout=self.polling_interval*2) print("Планировщик задач остановлен") def _main_loop(self): """Основной цикл планировщика""" while self.is_running: # Проверяем и запускаем задачи, которые должны быть выполнены for task in self.tasks: if task.should_run(): task.run() # Удаляем завершенные задачи self.tasks = [task for task in self.tasks if not task.is_completed] # Минимальная задержка в основном цикле time.sleep(self.polling_interval) # Функции для мониторинга системы def monitor_cpu_usage(): """Мониторинг загрузки ЦП компьютера""" cpu_percent = psutil.cpu_percent(interval=1) print(f"Текущая загрузка ЦП: {cpu_percent}%") # Дополнительная информация о ЦП cpu_count = psutil.cpu_count(logical=False) cpu_count_logical = psutil.cpu_count(logical=True) print(f"Количество физических ядер ЦП: {cpu_count}") print(f"Количество логических ядер ЦП: {cpu_count_logical}") # Информация о загрузке каждого ядра per_cpu = psutil.cpu_percent(interval=1, percpu=True) for i, percent in enumerate(per_cpu): print(f"Загрузка ядра {i}: {percent}%") def monitor_network_packets(): """Мониторинг сетевых пакетов пользователя""" try: # Получаем информацию о сетевых интерфейсах net_io = psutil.net_io_counters(pernic=True) print("Информация о сетевых пакетах:") for interface, stats in net_io.items(): print(f"\nИнтерфейс: {interface}") print(f"Отправлено байт: {stats.bytes_sent}") print(f"Получено байт: {stats.bytes_recv}") print(f"Отправлено пакетов: {stats.packets_sent}") print(f"Получено пакетов: {stats.packets_recv}") print(f"Ошибки при отправке: {stats.errin}") print(f"Ошибки при получении: {stats.errout}") print(f"Пакеты отброшены при отправке: {stats.dropin}") print(f"Пакеты отброшены при получении: {stats.dropout}") # Дополнительно можно использовать netstat для Windows print("\nАктивные сетевые соединения:") result = subprocess.run(["netstat", "-n"], capture_output=True, text=True) connections = result.stdout.split('\n') # Выводим только первые 10 строк, чтобы не перегружать вывод for line in connections[:10]: print(line) except Exception as e: print(f"Ошибка при мониторинге сетевых пакетов: {e}") # Пример использования if __name__ == "__main__": # Создаем планировщик с интервалом опроса 0.5 секунды scheduler = TaskScheduler(polling_interval=0.5) # Добавляем задачи # Задача 1: Мониторинг загрузки ЦП (каждые 5 секунд) scheduler.schedule_task( "Мониторинг загрузки ЦП", monitor_cpu_usage, execution_time=datetime.now(), interval=5 ) # Задача 2: Мониторинг сетевых пакетов (каждые 10 секунд) scheduler.schedule_task( "Мониторинг сетевых пакетов", monitor_network_packets, execution_time=datetime.now() + timedelta(seconds=2), interval=10 ) # Запускаем планировщик scheduler.start() try: # Даем планировщику поработать 60 секунд print("Планировщик будет работать 60 секунд. Нажмите Ctrl+C для досрочного завершения.") time.sleep(60) except KeyboardInterrupt: print("Программа прервана пользователем") finally: # Останавливаем планировщик scheduler.stop() print("Программа завершена")
и что мы получаем в ответ

4.Этап: Итоги
Что можно сделать на основе такого простого планировщика задач?
Да практически всё, что угодно!
-
Реализовать удалённую доставку задач для корпоративных компьютеров.
-
Построить центры автоматизации для системных администраторов.
-
Развить проект до полноценного решения для управления инфраструктурой.
-
Или даже запустить коммерческий продукт на базе этой идеи.
Код, который мы написали, — это фундамент, на котором можно строить сложные и надёжные системы. Он лёгкий, расширяемый и даёт отличную отправную точку для собственных экспериментов и разработок.
Спасибо, что дочитали статью до конца! Надеюсь, материал был для вас полезным и вдохновляющим.
ссылка на оригинал статьи https://habr.com/ru/articles/904800/
Добавить комментарий