Недавно меня попросили помочь в определении источников утечки трафика в одной из организаций. Задачу усугубляло большое количество устройств в одном широковещательном домене, множество неуправляемых коммутаторов, отсутствие любой карты сети, а также старенький роутер на входе. В общем, это были настоящие «Авгиевы конюшни», но в итоге задача была решена, и данная статья посвящена методам, которые я использовал. Кто оказался виновником, я раскрою в конце статьи, чтобы не портить интригу.
Мой телеграмм канал — сообщество, где делятся опытом
В текущей задаче я придерживался следующих принципов:
Для определения источника аномального трафика я разбил задачу на несколько подзадач:
-
Круглосуточный мониторинг скорости доступа к интернету в различных частях организации.
-
Непрерывное сканирование всего трафика с целью выявления хостов, потребляющих его в наибольшем объеме.
-
Анализ хостов, генерирующих наибольший трафик, с последующим устранением выявленных проблем.
Задача 1: Круглосуточный мониторинг скорости доступа к интернету в разных уголках организации
На узлы я установил пользовательские экспортеры на Python, которые запускают Speedtest CLI в заданные промежутки времени и отправляют метрики в Prometheus.
Экспортер выдает для Prometheus такие метрики, как:
-
internet_download_speed_mbps # Скорость загрузки из интернета
-
internet_upload_speed_mbps # Скорость выгрузки в интернет
-
internet_ping_ms # Задержка пинга
Код экспортера представлен ниже — speedtest_exporter.py:
Скрытый текст
# metrics prometheus # Скорость загрузки из интернета # avg(internet_download_speed_mbps) by (instance) # Скорость выгрузки в интернет # avg(internet_upload_speed_mbps) by (instance) # Задержка пинга # avg(internet_ping_ms) by (instance) from flask import Flask, Response import speedtest import logging import time import threading app = Flask(__name__) # Настройка логирования logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Переменные для кеша metrics_data = { "download_speed": 0.0, "upload_speed": 0.0, "ping": 0.0 } last_test_time = 0 data_lock = threading.Lock() cache_duration = 60 # Измеряем раз в минуту def run_speedtest(): global metrics_data, last_test_time while True: try: logging.info("Запуск измерений интернет-соединения.") st = speedtest.Speedtest() st.get_best_server() download_speed = st.download() / 1e6 # Конвертируем в Мбит/с upload_speed = st.upload() / 1e6 # Конвертируем в Мбит/с ping = st.results.ping # Пинг в миллисекундах with data_lock: metrics_data["download_speed"] = download_speed metrics_data["upload_speed"] = upload_speed metrics_data["ping"] = ping last_test_time = time.time() logging.info(f"Измерения завершены: Загрузка: {download_speed:.2f} Mbps, " f"Выгрузка: {upload_speed:.2f} Mbps, " f"Пинг: {ping:.2f} ms.") except Exception as e: logging.error(f"Ошибка при измерениях: {str(e)}") time.sleep(cache_duration) @app.route('/metrics') def metrics(): with data_lock: response = f""" # HELP internet_download_speed_mbps Download speed in Mbps # TYPE internet_download_speed_mbps gauge internet_download_speed_mbps {metrics_data["download_speed"]} # HELP internet_upload_speed_mbps Upload speed in Mbps # TYPE internet_upload_speed_mbps gauge internet_upload_speed_mbps {metrics_data["upload_speed"]} # HELP internet_ping_ms Ping latency in milliseconds # TYPE internet_ping_ms gauge internet_ping_ms {metrics_data["ping"]} """ return Response(response, mimetype="text/plain") if __name__ == '__main__': # Запускаем тест скорости в отдельном потоке speedtest_thread = threading.Thread(target=run_speedtest, daemon=True) speedtest_thread.start() # Запускаем Flask app.run(host='0.0.0.0', port=9101)
Задача 2: Круглосуточное сканирование всего трафика и определение адресов хостов, наиболее сильно этот трафик расходующих
Мы проводим мониторинг трафика в местах его схождения. Удобнее всего это делать на шлюзе, а в нашем случае роутере, либо настраивая зеркалирование трафика на порт, к которому подключаем анализирующее устройство. Для этих целей я использовал Orange Pi, так как эти дешевые и многофункциональные одноплатные компьютеры просты в использовании и удобны в работе. О применении Orange Pi я писал в предыдущих статьях тут и тут.
Код анализатора трафика представлен ниже — traffic_monitor.py:
Скрытый текст
import time import logging from logging.handlers import RotatingFileHandler from collections import defaultdict from scapy.all import sniff, IP from threading import Thread import datetime import ipaddress # ======================= # Статичные переменные # ======================= # Задержка между логированиями в секундах (по умолчанию 5 минут) DELAY_SECONDS = 60 # 300 секунд = 5 минут # Список префиксов сетевых адресов для широковещательного трафика NETWORKS = ['192.168.2.0/24', '192.168.10.0/24'] # Добавьте необходимые префиксы # Вычисляем широковещательные адреса для каждой сети NETWORK_BROADCAST_ADDRS = [str(ipaddress.IPv4Network(net).broadcast_address) for net in NETWORKS] # Широковещательный адрес по умолчанию BROADCAST_IP = '255.255.255.255' # Хранение статистики трафика traffic_stats = defaultdict(int) broadcast_traffic_stats = defaultdict(int) # ======================= # Настройка логирования с ротацией # ======================= LOG_FILENAME = 'top_ip_traffic.log' # Создаем обработчик ротации логов rotating_handler = RotatingFileHandler( LOG_FILENAME, # Имя файла maxBytes=5 * 1024 * 1024, # Максимальный размер файла (5 MB) backupCount=3 # Максимальное количество резервных копий (3 файла) ) # Настраиваем формат логов rotating_handler.setFormatter(logging.Formatter('%(message)s')) # Получаем корневой логгер и устанавливаем ему обработчик ротации logger = logging.getLogger() logger.setLevel(logging.INFO) logger.addHandler(rotating_handler) def packet_callback(packet): """Callback функция для обработки пакетов.""" if IP in packet: src_ip = packet[IP].src dst_ip = packet[IP].dst packet_length = len(packet) # Проверяем, является ли адрес назначения широковещательным is_broadcast = False if dst_ip == BROADCAST_IP or dst_ip in NETWORK_BROADCAST_ADDRS: is_broadcast = True # else: # for prefix in NETWORK_PREFIXES: # if dst_ip.startswith(prefix): # is_broadcast = True # break if is_broadcast: # Увеличиваем трафик для источника и назначения для широковещательного трафика broadcast_traffic_stats[src_ip] += packet_length broadcast_traffic_stats[dst_ip] += packet_length else: # Увеличиваем трафик для обычного трафика traffic_stats[src_ip] += packet_length traffic_stats[dst_ip] += packet_length def log_top_ips(): """Логирует IP-адреса с наибольшим трафиком.""" while True: time.sleep(DELAY_SECONDS) try: # Получаем текущую метку времени now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Определяем топ IP для обычного трафика top_ips = sorted(traffic_stats.items(), key=lambda x: x[1], reverse=True)[:10] # Определяем топ IP для широковещательного трафика top_broadcast_ips = sorted(broadcast_traffic_stats.items(), key=lambda x: x[1], reverse=True)[:10] # Логируем информацию logging.info("." * 37) logging.info(f" Топ 10 IP на {now}") logging.info(" Общий трафик ") # Добавляем заголовки таблицы logging.info(f"{'IP-адрес':<20} {'Байты':>15}") logging.info("-" * 35) for ip, bytes_used in top_ips: logging.info(f"{ip:<20} {bytes_used:>15}") logging.info(" Широковещательный трафик ") logging.info(f"{'IP-адрес':<20} {'Байты':>15}") logging.info("-" * 35) for ip, bytes_used in top_broadcast_ips: logging.info(f"{ip:<20} {bytes_used:>15}") logging.info("." * 37) # Очищаем статистику обоих типов трафика traffic_stats.clear() broadcast_traffic_stats.clear() except Exception as e: logging.error(f"Произошла ошибка при логировании: {e}") def main(): """Основная функция для захвата пакетов и логирования трафика.""" try: # Запускаем поток для логирования logger_thread = Thread(target=log_top_ips, daemon=True) logger_thread.start() # Запускаем захват пакетов с оптимизированным фильтром # Фильтр захватывает только IP-пакеты для повышения эффективности sniff(iface=None, prn=packet_callback, store=0, filter="ip") except KeyboardInterrupt: logging.info("Завершение работы программы пользователем...") except Exception as e: logging.error(f"Произошла ошибка в основной функции: {e}") # Запуск программы if __name__ == "__main__": main()
Созданием виртуального окружения, установкой зависимостей и настройкой служб для указанных выше скриптов займется Ansible. Плейбук, который я применял, приведен ниже: ansible_speedtest.yml.
Скрытый текст
--- - name: Установка Python и настройка Speedtest Exporter hosts: localhost tasks: - name: Обновление списка пакетов apt: update_cache: true become: true - name: Установка необходимых пакетов apt: name: - curl - python3-venv - python3.11-dev - libffi-dev - libssl-dev - nmap state: present become: true - name: Проверка существования виртуального окружения stat: path: "/etc/apt/sources.list.d/ookla_speedtest-cli.list" register: speedtest_repo - name: Добавление репозитория speedtest-cli shell: curl -s https://packagecloud.io/install/repositories/ookla/speedtest-cli/script.deb.sh | bash args: executable: /bin/bash become: true when: not speedtest_repo.stat.exists - name: Установка speedtest apt: name: speedtest state: present become: true - name: Проверка существования виртуального окружения stat: path: "{{ playbook_dir }}/.venv" register: venv_directory - name: Создание виртуального окружения command: python3 -m venv .venv args: chdir: "{{ playbook_dir }}" when: not venv_directory.stat.exists - name: Обновить pip и setuptools в виртуальном окружении pip: name: - pip - setuptools state: latest virtualenv: "{{ playbook_dir }}/.venv" - name: Установка зависимостей из requirements.txt pip: requirements: "{{ playbook_dir }}/requirements.txt" virtualenv: "{{ playbook_dir }}/.venv" # службы # speedtest exporter - name: Создание файла службы для speedtest exporter become: true copy: dest: /etc/systemd/system/speedtest_exporter.service content: | [Unit] Description=Служба Speedtest Exporter After=network.target [Service] User={{ ansible_env.USER }} WorkingDirectory={{ playbook_dir }} ExecStart={{ playbook_dir }}/.venv/bin/python3 {{ playbook_dir }}/speedtest_exporter.py Restart=always Environment=PYTHONUNBUFFERED=1 [Install] WantedBy=multi-user.target - name: Перезагрузка системных служб become: true command: systemctl daemon-reload - name: Запуск службы speedtest_exporter become: true systemd: name: speedtest_exporter state: started enabled: yes ## traffic monitor - name: Создание файла службы для traffic monitor (привилегированный режим) become: true copy: dest: /etc/systemd/system/traffic_monitor.service content: | [Unit] Description=Служба Traffic Monitor After=network.target [Service] User=root WorkingDirectory={{ playbook_dir }} ExecStart={{ playbook_dir }}/.venv/bin/python3 {{ playbook_dir }}/traffic_monitor.py Restart=always RestartSec=5s TimeoutSec=30 StandardOutput=journal StandardError=journal Environment=PYTHONUNBUFFERED=1 [Install] WantedBy=multi-user.target - name: Перезагрузка системных служб become: true command: systemctl daemon-reload - name: Запуск службы traffic_monitor become: true systemd: name: traffic_monitor state: restarted enabled: yes
Задача 3: Изучение хостов, генерирующего наибольших трафик, устранение проблем
Далее подключаем Grafana к Prometheus и изучаем периоды падения трафика на примере тестовой среды.
Также Alertmanager позволяет настроить предупреждения по пороговым значениям метрик. В моем случае, при падении трафика ниже допустимого, приходит оповещение, например, в телеграмм.
Теперь в моменты падения графика мы изучаем лог нашего анализатора трафика и определяем ip адреса узлов, наиболее активно использующих трафик:
После того, как подозрительный ip найден, мы должны определиться, что мы хотим узнать о нем:
Из инструментария, мы используем утилиту nmap, с помощью которой определим местоположение, выполним обычное, а затем агрессивное сканирование. Напишем нужный скрипт на python:
Скрытый текст
import sys import socket import requests from pythonping import ping import re import nmap import datetime import time import subprocess def validate_ip(ip): pattern = re.compile(r"^(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)$") return pattern.match(ip) is not None def reverse_dns(ip): try: return socket.gethostbyaddr(ip)[0] except socket.herror: return "Не удалось найти доменное имя (reverse DNS lookup)." def check_ping(ip): """ Проверяет доступность хоста по IP адресу с помощью команды ping. :param ip: IP-адрес хоста :return: Среднее время отклика в миллисекундах (float) если доступен, иначе False """ try: # Выполняем команду ping с 1 пакетом output = subprocess.check_output( ["ping", "-c", "1", "-W", "1", ip], stderr=subprocess.STDOUT, universal_newlines=True ) # Парсим вывод ping для получения среднего времени отклика for line in output.splitlines(): if "time=" in line: # Пример строки: "64 bytes from 8.8.8.8: icmp_seq=1 ttl=117 time=14.2 ms" time_ms = float(line.split("time=")[1].split(" ")[0]) return time_ms return False except subprocess.CalledProcessError: return False def wait_for_host(ip, timeout=3600, interval=60): """ Ждет, пока хост станет доступен или истечет таймаут. :param ip: IP-адрес хоста :param timeout: Таймаут в секундах (по умолчанию 1 час) :param interval: Интервал между проверками в секундах (по умолчанию 60 секунд) :return: True если хост стал доступен, False если таймаут истек """ start_time = datetime.datetime.now() end_time = start_time + datetime.timedelta(seconds=timeout) while datetime.datetime.now() < end_time: ping_result = check_ping(ip) if isinstance(ping_result, float): print(f"Хост {ip} доступен. Среднее время отклика: {ping_result} ms") return True else: remaining = end_time - datetime.datetime.now() remaining_seconds = int(remaining.total_seconds()) hours, remainder = divmod(remaining_seconds, 3600) minutes, seconds = divmod(remainder, 60) print(f"Хост {ip} недоступен. Ожидание... Осталось времени: {hours}ч {minutes}мин {seconds}с") time.sleep(interval) print(f"Таймаут ожидания хоста {ip} истек после {timeout / 3600} часа(ов).") return False def geolocation(ip): try: response = requests.get( f"http://ip-api.com/json/{ip}?fields=status,country,regionName,city,zip,lat,lon,timezone,isp,org,as", timeout=10 ) data = response.json() if data.get('status') == 'success': return data else: return {"error": "Не удалось получить геолокацию."} except Exception as e: return {"error": str(e)} def print_geolocation(geo): if 'error' in geo: print(f"Геолокация: {geo['error']}") else: print("Геолокация:") print(f" Страна: {geo.get('country')}") print(f" Регион: {geo.get('regionName')}") print(f" Город: {geo.get('city')}") print(f" Почтовый индекс: {geo.get('zip')}") print(f" Широта: {geo.get('lat')}") print(f" Долгота: {geo.get('lon')}") print(f" Часовой пояс: {geo.get('timezone')}") print(f" Провайдер: {geo.get('isp')}") print(f" Организация: {geo.get('org')}") print(f" Автономная система: {geo.get('as')}") def scan_ip(ip): """ Сканирует указанный IP-адрес и выводит результаты сканирования. Параметры: ip (str): IP-адрес для сканирования. """ scanner = nmap.PortScanner() try: # Проведение сканирования с опциями: # -sS : TCP SYN scan # -sV : Определение версий сервисов # -O : Определение операционной системы scanner.scan(ip, arguments='-sS -sV -O') result = {} host_info = scanner[ip] # Инициализация result['hostname'] = "Не определено" result['mac_address'] = "Не доступен" result['open_ports'] = [] result['services'] = {} result['os'] = "Не определена" # Получение имени хоста if 'hostnames' in host_info and host_info['hostnames']: hostname = host_info['hostnames'][0]['name'] result['hostname'] = hostname if hostname else "Не определено" # Получение MAC-адреса if 'addresses' in host_info: if 'mac' in host_info['addresses']: result['mac_address'] = host_info['addresses']['mac'] # Обработка TCP-портов if 'tcp' in host_info: for port in host_info['tcp']: state = host_info['tcp'][port]['state'] if state == 'open': result['open_ports'].append(port) service_name = host_info['tcp'][port]['name'] service_product = host_info['tcp'][port].get('product', 'Неизвестно') service_version = host_info['tcp'][port].get('version', 'Неизвестно') result['services'][port] = f"{service_name} ({service_product} {service_version})" # Обработка UDP-портов (опционально) if 'udp' in host_info: for port in host_info['udp']: state = host_info['udp'][port]['state'] if state == 'open': result['open_ports'].append(port) service_name = host_info['udp'][port]['name'] service_product = host_info['udp'][port].get('product', 'Неизвестно') service_version = host_info['udp'][port].get('version', 'Неизвестно') result['services'][port] = f"{service_name} ({service_product} {service_version})" # Определение операционной системы if 'osmatch' in host_info and host_info['osmatch']: result['os'] = host_info['osmatch'][0]['name'] # Вывод результатов print(f"Результаты сканирования для {ip}:") print(f" Имя хоста: {result['hostname']}") print(f" MAC-адрес: {result['mac_address']}") print(f" Операционная система: {result['os']}") if result["open_ports"]: # Сортировка портов для удобства open_ports_sorted = sorted(result["open_ports"]) print(" Открытые порты:", ", ".join(map(str, open_ports_sorted))) print(" Детали сервисов:") for port in open_ports_sorted: service_info = result["services"].get(port, "Информация недоступна") print(f" Порт {port}: {service_info}") else: print(" Открытых портов не обнаружено.") except Exception as e: print(f"Результаты сканирования для {ip}:") print(f" Ошибка: {str(e)}") def aggressive_scan(ip): """ Выполняет "агрессивное" сканирование ip с помощью Nmap. :param ip: IP-адрес (или диапазон), который необходимо просканировать. :return: Словарь с информацией об узле: { 'hostname': str (имя хоста или None), 'mac_address': str (MAC-адрес или None), 'open_ports': список открытых портов, 'services': словарь {порт: {протокол, сервис, продукт, версия}}, 'os': список возможных ОС (с их баллами совпадения) или None } """ scanner = nmap.PortScanner() # Аргументы: # -A — агрессивное сканирование (включает проверку версий и ОС) # -T4 — скорость сканирования (T4 обычно достаточно быстрая) # 1-65535 — диапазон всех портов (можно сузить при необходимости) scanner.scan(hosts=ip, ports="1-65535", arguments="-A -T4") results = { 'hostname': None, 'mac_address': None, 'open_ports': [], 'services': {}, 'os': None } if ip not in scanner.all_hosts(): return results # Хост не найден или не ответил host_info = scanner[ip] # Получаем имя хоста (если удалось определить) hostname = host_info.hostname() if hostname: results['hostname'] = hostname # MAC-адрес (если удалось определить) if 'addresses' in host_info and 'mac' in host_info['addresses']: results['mac_address'] = host_info['addresses']['mac'] # Определение ОС if 'osmatch' in host_info: results['os'] = [ { 'name': os_item['name'], 'accuracy': os_item['accuracy'], 'os_family': os_item.get('osclass', [{}])[0].get('osfamily') } for os_item in host_info['osmatch'] ] # Список протоколов (tcp, udp и т.д.) for proto in host_info.all_protocols(): ports = host_info[proto].keys() for port in ports: port_state = host_info[proto][port]['state'] if port_state == 'open': results['open_ports'].append(port) # Информация о сервисе service_name = host_info[proto][port].get('name', '') service_product = host_info[proto][port].get('product', '') service_version = host_info[proto][port].get('version', '') results['services'][port] = { 'protocol': proto, 'service': service_name, 'product': service_product, 'version': service_version } return results def print_scan_results(scan_data): """ Печатает результаты агрессивного сканирования в консоль. """ print("\nРезультаты агрессивного сканирования:") if not scan_data['hostname'] and not scan_data['open_ports']: print("Хост не ответил или не найден.") return print(f"Имя хоста: {scan_data['hostname']}") print(f"MAC-адрес: {scan_data['mac_address']}") if scan_data['os']: print("Возможные ОС:") for os_info in scan_data['os']: print(f" - {os_info['name']} (точность: {os_info['accuracy']}%), семейство: {os_info['os_family']}") else: print("Не удалось определить ОС.") if scan_data['open_ports']: print("\nОткрытые порты и сервисы:") for port in sorted(scan_data['open_ports']): service_info = scan_data['services'][port] print(f" Порт {port}/{service_info['protocol']}: {service_info['service']} " f"(продукт: {service_info['product']}, версия: {service_info['version']})") else: print("Открытых портов не найдено.") def main(): if len(sys.argv) != 2: print("Использование: python py_ip_monitor.py <IP-адрес>") sys.exit(1) ip = sys.argv[1] if not validate_ip(ip): print("Некорректный IP-адрес.") sys.exit(1) print(f"\nИнформация для IP: {ip}\n{'='*40}") # Обратный DNS dns = reverse_dns(ip) print(f"Обратный DNS: {dns}") # Геолокация geo = geolocation(ip) print_geolocation(geo) # Ping ping_result = check_ping(ip) if isinstance(ping_result, float): print(f"Среднее время отклика (ping): {ping_result} ms") else: print(f"Ping: {ping_result}") # Сканирование портов scan_ip(ip) # 4. Расширенное сканирование портов (aggressive_scan + вывод) scan_result = aggressive_scan(ip) print_scan_results(scan_result) if __name__ == "__main__": main()
Запустим сканирование и получим следующие данные:
mac-адрес
тип операционной системы
открытые порты
Все это позволит нам понять какие сервисы запущены на узле и определить его предназначение.
Если доступ к аномальным узлам есть, и мы можем на них авторизоваться, то изучаем запущенные процессы и ищем те, что активно используют сетевое соединение. Например c помощью утилиты nethogs.
Итог
Что теперь с этим делать? Зависит от ваших задач.
Если узел расходует трафик который не должен расходовать, то проще всего заблокировать MAC-адрес на роутере, добавив соответствующее правило, или внести необходимые изменения в файрвол (при его наличии).
Также изучая arp таблицы можно найти местоположение устройства.
В моем же случае виновником был смарт-телевизор, применение которого весьма чувствительно для сети, поставляемой через VPN. Хозяин устройства был найден, а телевизор изъят.
В данной статье мы рассмотрели простые способы создания пользовательских экспортеров для prometheus, способы мониторинга трафика в сети, а также методы быстрого развертывания наших скриптов на узлах с применением системы управления конфигурациями Ansible.
Всем спасибо за внимание!
ссылка на оригинал статьи https://habr.com/ru/articles/870998/
Добавить комментарий