Решение проблемы «падения» процессов в приложении, работающее 24/7 в режиме мультипроцессинга

от автора

Постановка проблемы

Есть приложение, в котором выполняет несколько функций, например, сбора данных из различных источников, их обработки и помещения результатов в БД. Приложение, по задумке, должно работать 24/7, чтобы в любой момент можно было подключиться к БД и получить свежайшую информацию.

Но вот незадача… Вроде бы весь код отлажен, работа приложения стабильна, но в какие-то моменты замечается, что «бах» и процесс пропал. Ни ошибки в логах, ни сигналов, ничего нет. И как ловить, не очень понятно, а работа стоит и надо как-то запускаться. На отладку нет много времени.

Симуляция

Представим, что есть две простые функции, которые что-то делают (не важно, чем они заняты). Если мы говорим о мультипроцессинге, то мы говорим, что каждая функция запускается в системе отдельно, как отдельная задача, отследить которую можно в Диспетчере задач. Значит у процесса есть PID.

Представим, что третий процесс — «следилка». Следилка занимается тем, что она отслеживает существует ли ещё процесс с таким PID в системе. Если работа функции завершается или процесс самопроизвольно погибает, то её задача увидеть это (т.е. не увидеть такой PID) и перезапустить процесс, заново поместив туда функцию.

Очевидно, что нужно имитировать падение процесса, т.е. должна быть функция, которая искусственно убивает процесс.

Также хочется, чтобы уж быть максимально приближенным к реальности, не передавать переменные между процессами, чтобы все работали автономно..

Реализация

Вот пример двух простых функций, которые живут и что-то делают (можно не смотреть скрытый текст, т.н. функции будут представлены также в полных текстах решения)

Hidden text
def proc_1(timepass=1, repeat=20):     repeated = True     i = 0     while repeated:         x = random.uniform(0, timepass)         i += 1         if i < repeat:             print("Proc_1: " + str(i) + ", will fall asleep for: " + str(x))         else:             repeated = False             print("Proc_1 COMPLETED!")             break         time.sleep(x)   def proc_2(timepass=1, repeat=20):     repeated = True     i = 0     while repeated:         x = random.uniform(0, timepass)         i += 1         if i < repeat:             print("Proc_2: " + str(i) + ", will fall asleep for: " + str(x))         else:             repeated = False             print("Proc_2 COMPLETED!")             break         time.sleep(x)

Было реализовано два примера, которые отличаются тем, что функция, отвечающая за слежение и восстановление, в первом случае, живёт в основном процессе приложения, а во втором — запускается отдельным процессом. Второй случай усложняется тем, что перезапуск нового процесса производится из функции восстановления и если выполнить join в ней, то сама она также будет ждать. Решением это проблемы явилось запуск join в отдельном потоке.

Второй вариант, кстати, выглядит приятней и логичней.

Отдельным моментом является понимание того, в какой операционной системе вы запускаете работу приложения. Пришлось делать «оговорку» на Linux по понятным причинам.

Вот первый вариант реализации (полный текст решения):

# Работа с процессами - рабочая версия  import multiprocessing as mp import time import random import psutil import os import platform import fnmatch import signal import subprocess   class bcolors:     HEADER = '\033[95m'     OKBLUE = '\033[94m'     OKCYAN = '\033[96m'     OKGREEN = '\033[92m'     WARNING = '\033[93m'     FAIL = '\033[91m'     ENDC = '\033[0m'     BOLD = '\033[1m'     UNDERLINE = '\033[4m'      def proc_1(timepass=1, repeat=20):     repeated = True     i = 0     while repeated:         x = random.uniform(0, timepass)         i += 1         if i < repeat:             print("Proc_1: " + str(i) + ", will fall asleep for: " + str(x))         else:             repeated = False             print("Proc_1 COMPLETED!")             break         time.sleep(x)   def proc_2(timepass=1, repeat=20):     repeated = True     i = 0     while repeated:         x = random.uniform(0, timepass)         i += 1         if i < repeat:             print("Proc_2: " + str(i) + ", will fall asleep for: " + str(x))         else:             repeated = False             print("Proc_2 COMPLETED!")             break         time.sleep(x)   def process_killer(PID_list, timepass, exclusion_list=[]):     while True:         system = platform.system()         x = random.uniform(0, timepass)         time.sleep(x)         this_process_pid = os.getpid()         this_process = psutil.Process(this_process_pid)         this_process_info_as_dict = this_process.as_dict()         parent_process_pid = this_process.ppid()         parent_process = psutil.Process(parent_process_pid)         parent_process_as_dict = parent_process.as_dict()         parent_process_children = parent_process.children(recursive=True)         child_pid_list = []         for i in range(len(parent_process_children)):             child_info_as_dict = parent_process_children[i].as_dict()             if child_info_as_dict['pid'] != this_process_pid:                 child_pid_list.append(child_info_as_dict['pid'])         child_pid_list = list(set(child_pid_list) - set(exclusion_list))         child_pid_list = list(set(child_pid_list) - set([this_process_pid]))         # for i in range(len(child_pid_list)):         #     print(f'Process_{i+1} PID: {child_pid_list[i]}')         temp_str = '\n'.join([f'Process_{i + 1} PID: {child_pid_list[i]}' for i in range(len(child_pid_list))])         print(f"{temp_str}")          if len(child_pid_list) > 0:             if len(child_pid_list) > 1:                 number = random.randint(0, len(child_pid_list) - 1)             else:                 number = 0             kill_proc = psutil.Process(child_pid_list[number])             kill_process_info_as_dict = kill_proc.as_dict()             if psutil.pid_exists(kill_process_info_as_dict['pid']):                 if fnmatch.fnmatch(kill_process_info_as_dict['name'], "python*"):                     print("We kill the process with PID", kill_process_info_as_dict['pid'])                     try:                         process = psutil.Process(kill_process_info_as_dict['pid'])                     except psutil.NoSuchProcess:                         print(f"Process with PID {child_pid_list[number]} not found.")                         continue                     else:                         if system == "Windows":                             kill_proc.kill()                         elif system == "Linux":                             os.kill(kill_process_info_as_dict['pid'], signal.SIGTERM)                             # os.kill(kill_process_info_as_dict['pid'],signal.SIGKILL)                             # subprocess.call(["kill", str(kill_process_info_as_dict['pid'])])                          print(f"{bcolors.FAIL}Process with PID {kill_process_info_as_dict['pid']} killed.{bcolors.ENDC}")                         child_pid_list.remove(kill_process_info_as_dict['pid'])     if __name__ == "__main__":     print(f'{bcolors.OKGREEN}Начало работы программы!{bcolors.ENDC}')      process_name_list = ["process_1", "process_2"]      process_1 = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})     process_2 = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})      process_1.start()     process_2.start()      PID_list = [process_1.pid, process_2.pid]      # process_recov = mp.Process(target=process_recovery, kwargs={'process_pid_list': PID_list})     # process_recov.start()      process_kill = mp.Process(target=process_killer,                               kwargs={'PID_list': PID_list, 'timepass': 10, 'exclusion_list': []})     process_kill.start()      PID_list.append(process_kill.pid)      system = platform.system()      while True:         if system == "Linux":             os.wait()         for i in range(len(PID_list)):             try:                 if psutil.pid_exists(PID_list[i]):                     pass                     # print(f'Process with PID {process_pid} is alive')                 else:                     print(f'Process with PID {PID_list[0]} is dead')                     print(f"{bcolors.WARNING}Restoring the process{bcolors.ENDC}")                     if PID_list[i] == PID_list[0]:                         process = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})                     if PID_list[i] == PID_list[1]:                         process = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})                     process.start()                     old_pid = PID_list[i]                     PID_list[i] = process.pid                      temp_str=""                     for i in range(len(PID_list)):                         if  PID_list[i]==process.pid:                             temp_str += f'{bcolors.OKGREEN}Process_{i + 1} PID: {PID_list[i]}{bcolors.ENDC} (old: {old_pid})\n'                         else:                             temp_str += f'Process_{i + 1} PID: {PID_list[i]}\n'                     temp_str = temp_str.rstrip()                     # temp_str = '\n'.join(                     #     [f'Process_{i + 1} PID: {PID_list[i]}' for i in range(len(PID_list))])                     print('Recovery result:\n' + temp_str)             except:                 pass         time.sleep(0.2)          print("Main PID:", os.getpid())     print("Process_1 PID:", process_1.pid)     print("Process_2 PID:", process_2.pid)     print("Process_killer PID:", process_kill.pid)      process_1.join()     process_2.join()     process_kill.join()     # process_recov.join()     process.join()      time.sleep(5)      print("Program completed") 

А вот второй:

# Работа с процессами - рабочая, но сомнительная версия import multiprocessing as mp import threading import time import random import psutil import os import platform import fnmatch import signal import subprocess  class bcolors:     HEADER = '\033[95m'     OKBLUE = '\033[94m'     OKCYAN = '\033[96m'     OKGREEN = '\033[92m'     WARNING = '\033[93m'     FAIL = '\033[91m'     ENDC = '\033[0m'     BOLD = '\033[1m'     UNDERLINE = '\033[4m'  def proc_1(timepass=1, repeat=20):     repeated = True     i = 0     while repeated:         x = random.uniform(0, timepass)         i += 1         if i < repeat:             print("Proc_1: " + str(i) + ", will fall asleep for: " + str(x))         else:             repeated = False             print("Proc_1 COMPLETED!")             break         time.sleep(x)   def proc_2(timepass=1, repeat=20):     repeated = True     i = 0     while repeated:         x = random.uniform(0, timepass)         i += 1         if i < repeat:             print("Proc_2: " + str(i) + ", will fall asleep for: " + str(x))         else:             repeated = False             print("Proc_2 COMPLETED!")             break         time.sleep(x)   def process_killer(PID_list, timepass, exclusion_list=[]):     while True:         system = platform.system()         x = random.uniform(0, timepass)         time.sleep(x)         this_process_pid = os.getpid()         this_process = psutil.Process(this_process_pid)         this_process_info_as_dict = this_process.as_dict()         parent_process_pid = this_process.ppid()         parent_process = psutil.Process(parent_process_pid)         parent_process_as_dict = parent_process.as_dict()         parent_process_children = parent_process.children(recursive=True)         child_pid_list = []         for i in range(len(parent_process_children)):             child_info_as_dict = parent_process_children[i].as_dict()             if child_info_as_dict['pid'] != this_process_pid:                 child_pid_list.append(child_info_as_dict['pid'])         child_pid_list = list(set(child_pid_list) - set(exclusion_list))         # for i in range(len(child_pid_list)):         #     print(f'Process_{i+1} PID: {child_pid_list[i]}')         temp_str = '\n'.join([f'Process_{i + 1} PID: {child_pid_list[i]}' for i in range(len(child_pid_list))])         print(f"{temp_str}")          if len(child_pid_list) > 0:             if len(child_pid_list) > 1:                 number = random.randint(0, len(child_pid_list) - 1)             else:                 number = 0             kill_proc = psutil.Process(child_pid_list[number])             kill_process_info_as_dict = kill_proc.as_dict()             if psutil.pid_exists(kill_process_info_as_dict['pid']):                 if fnmatch.fnmatch(kill_process_info_as_dict['name'], "python*"):                     print("We kill the process with PID", kill_process_info_as_dict['pid'])                     try:                         process = psutil.Process(kill_process_info_as_dict['pid'])                     except psutil.NoSuchProcess:                         print(f"Process with PID {child_pid_list[number]} not found.")                         continue                     else:                         if system == "Windows":                             kill_proc.kill()                         elif system == "Linux":                             os.kill(kill_process_info_as_dict['pid'], signal.SIGTERM)                             # os.kill(kill_process_info_as_dict['pid'],signal.SIGKILL)                             # subprocess.call(["kill", str(kill_process_info_as_dict['pid'])])                          print(f"{bcolors.FAIL}Process with PID {kill_process_info_as_dict['pid']} killed.{bcolors.ENDC}")                         child_pid_list.remove(kill_process_info_as_dict['pid'])   def process_recovery(process_pid_list):     PID_list = process_pid_list     this_process_pid = os.getpid()     this_process = psutil.Process(this_process_pid)     this_process_info_as_dict = this_process.as_dict()     parent_process_pid = this_process.ppid()     parent_process = psutil.Process(parent_process_pid)     parent_process_as_dict = parent_process.as_dict()     parent_process_children = parent_process.children(recursive=True)     child_pid_list = []     for i in range(len(parent_process_children)):         child_info_as_dict = parent_process_children[i].as_dict()         if child_info_as_dict['pid'] != this_process_pid:             child_pid_list.append(child_info_as_dict['pid'])      system = platform.system()      while True:         if system == "Linux":             try:                 os.wait()             except:                 pass         for i in range(len(child_pid_list)):             try:                 if psutil.pid_exists(child_pid_list[i]):                     pass                     # print(f'Process with PID {process_pid} is alive')                 else:                     print(f'Process with PID {child_pid_list[i]} is dead')                     print(f"{bcolors.WARNING}Restoring the process{bcolors.ENDC}")                     if child_pid_list[i] == PID_list[0]:                         process = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})                     elif child_pid_list[i] == PID_list[1]:                         process = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})                     else:                         process = mp.Process(target=process_killer,                                                   kwargs={'PID_list': PID_list, 'timepass': 10,                                                           'exclusion_list': [process_recov.pid, ]})                     process.start()                     old_pid = child_pid_list[i]                     child_pid_list[i] = process.pid                     if old_pid == PID_list[0]:                         PID_list[0] = process.pid                     if old_pid == PID_list[1]:                         PID_list[1] = process.pid                      temp_str = ""                     for i in range(len(child_pid_list)):                         if child_pid_list[i] == process.pid:                             temp_str += f'{bcolors.OKGREEN}Process_{i + 1} PID: {child_pid_list[i]}{bcolors.ENDC} (old: {old_pid})\n'                         else:                             temp_str += f'Process_{i + 1} PID: {child_pid_list[i]}\n'                     temp_str = temp_str.rstrip()                     # temp_str = '\n'.join(                     #     [f'Process_{i + 1} PID: {PID_list[i]}' for i in range(len(PID_list))])                     print('Recovery result:\n' + temp_str)                     threading.Thread(target=process.join).start()             except:                 pass         time.sleep(0.2)   if __name__ == "__main__":     process_name_list = ["process_1", "process_2"]      process_1 = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})     process_2 = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})      process_1.start()     process_2.start()      PID_list = [process_1.pid, process_2.pid]      process_recov = mp.Process(target=process_recovery, kwargs={'process_pid_list': PID_list})     process_recov.start()      process_kill = mp.Process(target=process_killer,                               kwargs={'PID_list': PID_list, 'timepass': 10, 'exclusion_list': [process_recov.pid, ]})     #process_kill.start()        print("Main PID:", os.getpid())     print("Process_1 PID:", process_1.pid)     print("Process_2 PID:", process_2.pid)     print("Process_killer PID:", process_kill.pid)      process_1.join()     process_2.join()     #process_kill.join()     process_recov.join()      time.sleep(5)      print("Program completed") 

Надеюсь, что, если вдруг, вы столкнётесь с подобной проблемой и не захотите тратить время на поиска ошибки — фантома, то вам поможет такое решение проблемы.


ссылка на оригинал статьи https://habr.com/ru/articles/741774/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *