Постановка проблемы
Есть приложение, в котором выполняет несколько функций, например, сбора данных из различных источников, их обработки и помещения результатов в БД. Приложение, по задумке, должно работать 24/7, чтобы в любой момент можно было подключиться к БД и получить свежайшую информацию.
Но вот незадача… Вроде бы весь код отлажен, работа приложения стабильна, но в какие-то моменты замечается, что «бах» и процесс пропал. Ни ошибки в логах, ни сигналов, ничего нет. И как ловить, не очень понятно, а работа стоит и надо как-то запускаться. На отладку нет много времени.
Симуляция
Представим, что есть две простые функции, которые что-то делают (не важно, чем они заняты). Если мы говорим о мультипроцессинге, то мы говорим, что каждая функция запускается в системе отдельно, как отдельная задача, отследить которую можно в Диспетчере задач. Значит у процесса есть PID.
Представим, что третий процесс — «следилка». Следилка занимается тем, что она отслеживает существует ли ещё процесс с таким PID в системе. Если работа функции завершается или процесс самопроизвольно погибает, то её задача увидеть это (т.е. не увидеть такой PID) и перезапустить процесс, заново поместив туда функцию.
Очевидно, что нужно имитировать падение процесса, т.е. должна быть функция, которая искусственно убивает процесс.
Также хочется, чтобы уж быть максимально приближенным к реальности, не передавать переменные между процессами, чтобы все работали автономно..
Реализация
Вот пример двух простых функций, которые живут и что-то делают (можно не смотреть скрытый текст, т.н. функции будут представлены также в полных текстах решения)
Hidden text
def proc_1(timepass=1, repeat=20): repeated = True i = 0 while repeated: x = random.uniform(0, timepass) i += 1 if i < repeat: print("Proc_1: " + str(i) + ", will fall asleep for: " + str(x)) else: repeated = False print("Proc_1 COMPLETED!") break time.sleep(x) def proc_2(timepass=1, repeat=20): repeated = True i = 0 while repeated: x = random.uniform(0, timepass) i += 1 if i < repeat: print("Proc_2: " + str(i) + ", will fall asleep for: " + str(x)) else: repeated = False print("Proc_2 COMPLETED!") break time.sleep(x)
Было реализовано два примера, которые отличаются тем, что функция, отвечающая за слежение и восстановление, в первом случае, живёт в основном процессе приложения, а во втором — запускается отдельным процессом. Второй случай усложняется тем, что перезапуск нового процесса производится из функции восстановления и если выполнить join в ней, то сама она также будет ждать. Решением это проблемы явилось запуск join в отдельном потоке.
Второй вариант, кстати, выглядит приятней и логичней.
Отдельным моментом является понимание того, в какой операционной системе вы запускаете работу приложения. Пришлось делать «оговорку» на Linux по понятным причинам.
Вот первый вариант реализации (полный текст решения):
# Работа с процессами - рабочая версия import multiprocessing as mp import time import random import psutil import os import platform import fnmatch import signal import subprocess class bcolors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' def proc_1(timepass=1, repeat=20): repeated = True i = 0 while repeated: x = random.uniform(0, timepass) i += 1 if i < repeat: print("Proc_1: " + str(i) + ", will fall asleep for: " + str(x)) else: repeated = False print("Proc_1 COMPLETED!") break time.sleep(x) def proc_2(timepass=1, repeat=20): repeated = True i = 0 while repeated: x = random.uniform(0, timepass) i += 1 if i < repeat: print("Proc_2: " + str(i) + ", will fall asleep for: " + str(x)) else: repeated = False print("Proc_2 COMPLETED!") break time.sleep(x) def process_killer(PID_list, timepass, exclusion_list=[]): while True: system = platform.system() x = random.uniform(0, timepass) time.sleep(x) this_process_pid = os.getpid() this_process = psutil.Process(this_process_pid) this_process_info_as_dict = this_process.as_dict() parent_process_pid = this_process.ppid() parent_process = psutil.Process(parent_process_pid) parent_process_as_dict = parent_process.as_dict() parent_process_children = parent_process.children(recursive=True) child_pid_list = [] for i in range(len(parent_process_children)): child_info_as_dict = parent_process_children[i].as_dict() if child_info_as_dict['pid'] != this_process_pid: child_pid_list.append(child_info_as_dict['pid']) child_pid_list = list(set(child_pid_list) - set(exclusion_list)) child_pid_list = list(set(child_pid_list) - set([this_process_pid])) # for i in range(len(child_pid_list)): # print(f'Process_{i+1} PID: {child_pid_list[i]}') temp_str = '\n'.join([f'Process_{i + 1} PID: {child_pid_list[i]}' for i in range(len(child_pid_list))]) print(f"{temp_str}") if len(child_pid_list) > 0: if len(child_pid_list) > 1: number = random.randint(0, len(child_pid_list) - 1) else: number = 0 kill_proc = psutil.Process(child_pid_list[number]) kill_process_info_as_dict = kill_proc.as_dict() if psutil.pid_exists(kill_process_info_as_dict['pid']): if fnmatch.fnmatch(kill_process_info_as_dict['name'], "python*"): print("We kill the process with PID", kill_process_info_as_dict['pid']) try: process = psutil.Process(kill_process_info_as_dict['pid']) except psutil.NoSuchProcess: print(f"Process with PID {child_pid_list[number]} not found.") continue else: if system == "Windows": kill_proc.kill() elif system == "Linux": os.kill(kill_process_info_as_dict['pid'], signal.SIGTERM) # os.kill(kill_process_info_as_dict['pid'],signal.SIGKILL) # subprocess.call(["kill", str(kill_process_info_as_dict['pid'])]) print(f"{bcolors.FAIL}Process with PID {kill_process_info_as_dict['pid']} killed.{bcolors.ENDC}") child_pid_list.remove(kill_process_info_as_dict['pid']) if __name__ == "__main__": print(f'{bcolors.OKGREEN}Начало работы программы!{bcolors.ENDC}') process_name_list = ["process_1", "process_2"] process_1 = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30}) process_2 = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30}) process_1.start() process_2.start() PID_list = [process_1.pid, process_2.pid] # process_recov = mp.Process(target=process_recovery, kwargs={'process_pid_list': PID_list}) # process_recov.start() process_kill = mp.Process(target=process_killer, kwargs={'PID_list': PID_list, 'timepass': 10, 'exclusion_list': []}) process_kill.start() PID_list.append(process_kill.pid) system = platform.system() while True: if system == "Linux": os.wait() for i in range(len(PID_list)): try: if psutil.pid_exists(PID_list[i]): pass # print(f'Process with PID {process_pid} is alive') else: print(f'Process with PID {PID_list[0]} is dead') print(f"{bcolors.WARNING}Restoring the process{bcolors.ENDC}") if PID_list[i] == PID_list[0]: process = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30}) if PID_list[i] == PID_list[1]: process = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30}) process.start() old_pid = PID_list[i] PID_list[i] = process.pid temp_str="" for i in range(len(PID_list)): if PID_list[i]==process.pid: temp_str += f'{bcolors.OKGREEN}Process_{i + 1} PID: {PID_list[i]}{bcolors.ENDC} (old: {old_pid})\n' else: temp_str += f'Process_{i + 1} PID: {PID_list[i]}\n' temp_str = temp_str.rstrip() # temp_str = '\n'.join( # [f'Process_{i + 1} PID: {PID_list[i]}' for i in range(len(PID_list))]) print('Recovery result:\n' + temp_str) except: pass time.sleep(0.2) print("Main PID:", os.getpid()) print("Process_1 PID:", process_1.pid) print("Process_2 PID:", process_2.pid) print("Process_killer PID:", process_kill.pid) process_1.join() process_2.join() process_kill.join() # process_recov.join() process.join() time.sleep(5) print("Program completed")
А вот второй:
# Работа с процессами - рабочая, но сомнительная версия import multiprocessing as mp import threading import time import random import psutil import os import platform import fnmatch import signal import subprocess class bcolors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' def proc_1(timepass=1, repeat=20): repeated = True i = 0 while repeated: x = random.uniform(0, timepass) i += 1 if i < repeat: print("Proc_1: " + str(i) + ", will fall asleep for: " + str(x)) else: repeated = False print("Proc_1 COMPLETED!") break time.sleep(x) def proc_2(timepass=1, repeat=20): repeated = True i = 0 while repeated: x = random.uniform(0, timepass) i += 1 if i < repeat: print("Proc_2: " + str(i) + ", will fall asleep for: " + str(x)) else: repeated = False print("Proc_2 COMPLETED!") break time.sleep(x) def process_killer(PID_list, timepass, exclusion_list=[]): while True: system = platform.system() x = random.uniform(0, timepass) time.sleep(x) this_process_pid = os.getpid() this_process = psutil.Process(this_process_pid) this_process_info_as_dict = this_process.as_dict() parent_process_pid = this_process.ppid() parent_process = psutil.Process(parent_process_pid) parent_process_as_dict = parent_process.as_dict() parent_process_children = parent_process.children(recursive=True) child_pid_list = [] for i in range(len(parent_process_children)): child_info_as_dict = parent_process_children[i].as_dict() if child_info_as_dict['pid'] != this_process_pid: child_pid_list.append(child_info_as_dict['pid']) child_pid_list = list(set(child_pid_list) - set(exclusion_list)) # for i in range(len(child_pid_list)): # print(f'Process_{i+1} PID: {child_pid_list[i]}') temp_str = '\n'.join([f'Process_{i + 1} PID: {child_pid_list[i]}' for i in range(len(child_pid_list))]) print(f"{temp_str}") if len(child_pid_list) > 0: if len(child_pid_list) > 1: number = random.randint(0, len(child_pid_list) - 1) else: number = 0 kill_proc = psutil.Process(child_pid_list[number]) kill_process_info_as_dict = kill_proc.as_dict() if psutil.pid_exists(kill_process_info_as_dict['pid']): if fnmatch.fnmatch(kill_process_info_as_dict['name'], "python*"): print("We kill the process with PID", kill_process_info_as_dict['pid']) try: process = psutil.Process(kill_process_info_as_dict['pid']) except psutil.NoSuchProcess: print(f"Process with PID {child_pid_list[number]} not found.") continue else: if system == "Windows": kill_proc.kill() elif system == "Linux": os.kill(kill_process_info_as_dict['pid'], signal.SIGTERM) # os.kill(kill_process_info_as_dict['pid'],signal.SIGKILL) # subprocess.call(["kill", str(kill_process_info_as_dict['pid'])]) print(f"{bcolors.FAIL}Process with PID {kill_process_info_as_dict['pid']} killed.{bcolors.ENDC}") child_pid_list.remove(kill_process_info_as_dict['pid']) def process_recovery(process_pid_list): PID_list = process_pid_list this_process_pid = os.getpid() this_process = psutil.Process(this_process_pid) this_process_info_as_dict = this_process.as_dict() parent_process_pid = this_process.ppid() parent_process = psutil.Process(parent_process_pid) parent_process_as_dict = parent_process.as_dict() parent_process_children = parent_process.children(recursive=True) child_pid_list = [] for i in range(len(parent_process_children)): child_info_as_dict = parent_process_children[i].as_dict() if child_info_as_dict['pid'] != this_process_pid: child_pid_list.append(child_info_as_dict['pid']) system = platform.system() while True: if system == "Linux": try: os.wait() except: pass for i in range(len(child_pid_list)): try: if psutil.pid_exists(child_pid_list[i]): pass # print(f'Process with PID {process_pid} is alive') else: print(f'Process with PID {child_pid_list[i]} is dead') print(f"{bcolors.WARNING}Restoring the process{bcolors.ENDC}") if child_pid_list[i] == PID_list[0]: process = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30}) elif child_pid_list[i] == PID_list[1]: process = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30}) else: process = mp.Process(target=process_killer, kwargs={'PID_list': PID_list, 'timepass': 10, 'exclusion_list': [process_recov.pid, ]}) process.start() old_pid = child_pid_list[i] child_pid_list[i] = process.pid if old_pid == PID_list[0]: PID_list[0] = process.pid if old_pid == PID_list[1]: PID_list[1] = process.pid temp_str = "" for i in range(len(child_pid_list)): if child_pid_list[i] == process.pid: temp_str += f'{bcolors.OKGREEN}Process_{i + 1} PID: {child_pid_list[i]}{bcolors.ENDC} (old: {old_pid})\n' else: temp_str += f'Process_{i + 1} PID: {child_pid_list[i]}\n' temp_str = temp_str.rstrip() # temp_str = '\n'.join( # [f'Process_{i + 1} PID: {PID_list[i]}' for i in range(len(PID_list))]) print('Recovery result:\n' + temp_str) threading.Thread(target=process.join).start() except: pass time.sleep(0.2) if __name__ == "__main__": process_name_list = ["process_1", "process_2"] process_1 = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30}) process_2 = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30}) process_1.start() process_2.start() PID_list = [process_1.pid, process_2.pid] process_recov = mp.Process(target=process_recovery, kwargs={'process_pid_list': PID_list}) process_recov.start() process_kill = mp.Process(target=process_killer, kwargs={'PID_list': PID_list, 'timepass': 10, 'exclusion_list': [process_recov.pid, ]}) #process_kill.start() print("Main PID:", os.getpid()) print("Process_1 PID:", process_1.pid) print("Process_2 PID:", process_2.pid) print("Process_killer PID:", process_kill.pid) process_1.join() process_2.join() #process_kill.join() process_recov.join() time.sleep(5) print("Program completed")
Надеюсь, что, если вдруг, вы столкнётесь с подобной проблемой и не захотите тратить время на поиска ошибки — фантома, то вам поможет такое решение проблемы.
ссылка на оригинал статьи https://habr.com/ru/articles/741774/
Добавить комментарий