Если кто-то не в курсе, Nautobot — форк Netbox, его продвигает широко известный в узких кругах провайдер сетевой автоматизации NTC (Network to code). Возможно, порывшись в памяти, вы вспомните не очень красивую историю начала прошлого года про затирание истории коммитов Netbox — дело как раз касалось Nautobot и NTC. Но речь пойдет не о правилах приличия опенсорсного сообщества, а о том, как легко и непринужденно написать свой плагин тому, кто выбрал в качестве источника истины и платформы автоматизации Nautobot, а не Netbox. Хотя я уверен, что реализация плагина, о котором пойдет речь дальше, так же легко может быть выполнена и для Netbox.
Постановка задачи
В нашей не очень большой сети имеется ~50 филиалов, в каждом из которых установлены коммутаторы с общим количество портов от 300 до 500. На самом деле филиалов больше, но начнем мы именно с этого количества, потому что во всех из них установлено оборудование Cisco. Это важно, так как наш плагин будет пока одновендорным.
По каждому порту каждого коммутатора нам необходимо знать:
-
когда последний раз было подключение к этому порту
-
устройство с каким MAC/IP адресом и в каком VLAN подключено к этому порту
“Постой-ка, автор” — скажете вы — “такой функционал я уже где-то видел”. Совершенно точно, это switchmap (гуглится по словосочетанию cisco switchmap). Что ж, никто не обещал, что в статье будет совершенно новая идея для плагина. Но эта идея очень хорошо ложится в концепцию использования Nautobot как платформы автоматизации и SoT.
Сначала я хотел отдать дань уважения разработчикам switchmap и увековечить их творение в названии своего плагина. Но в итоге остановился на более объясняющем функционал названии — nautobot-porthistory-plugin (Sorry guys)
Стек
Ну тут всё просто. Раз мы используем Nautobot, будем писать на Python и активно использовать модули Django. Никаких внешних скриптов, все внутри Nautobot. Забирать данные с сетевых устройств будем по SNMP. Коммутаторы и маршрутизаторы — Cisco, все заведены в Nautobot, у каждого устройства учитываются primary address и все интерфейсы, для каждого филиала учитывается список префиксов и VLAN.
Что насчет документации?
У Nautobot замечательная документация. Еще бы, ведь она почти полностью скопирована у Netbox. Но все изменения и отличия от Netbox документируются не менее хорошо, за это разработчикам отдельное спасибо. Раздел про разработку плагинов достаточно большой — https://nautobot.readthedocs.io/en/stable/plugins/development/
Но вот только первое прочтение официально туториала не дало ответы на многие вопросы (впрочем, как и последующие). Многие знания пришлось выдергивать из исходного кода самого Nautobot, а также плагинов, коих становится все больше и больше.
Итак, начнем
В соответствии поставленной задаче разделим разработку плагина на две части. В первой части сделаем что полегче — научим плагин определять время последнего использования портов коммутатора, хранить и выводить эту информацию пользователям.
По окончании этого этапа ожидается, что наш плагин выполнит следующее:
-
создаст в БД таблицу для хранения информации о неиспользуемых интерфейсах;
-
создаст job, который будет с нужной нам периодичностью заполнять созданную таблицу;
-
изменит страницу вывода информации о коммутаторе так, чтобы было видно, какие порты не используются больше заданного количества дней.
Пока действуем строго по документации
Создадим в каталоге nautobot-plugin-porthistory скелет плагина на сервере (работаем под пользователем nautobot).
Nautobot includes a command to help create the plugin directory: nautobot-server startplugin [app_name]
Команда nautobot-server startplugin nautobot_porthistory_plugin
создаст следующий набор папок и файлов:
. └── nautobot_porthistory_plugin ├── __init__.py ├── migrations │ └── __init__.py ├── models.py ├── navigation.py ├── tests │ ├── __init__.py │ ├── test_models.py │ └── test_views.py ├── urls.py └── views.py
Ну что, файлы есть, давайте наполним их работающим кодом.
Файл пакета __init__.py уже содержит большинство нужных параметров. Всё, что нам требуется, указать какие опции конфига обязательны для заполнения, а также настройки плагина по умолчанию.
__init__.py """nautobot_porthistory_plugin Plugin Initilization.""" from nautobot.extras.plugins import PluginConfig class NautobotPorthistoryPluginConfig(PluginConfig): """Plugin configuration for the nautobot_porthistory_plugin plugin.""" name = "nautobot_porthistory_plugin" # Raw plugin name; same as the plugin's source directory. verbose_name = "nautobot_porthistory_plugin" # Human-friendly name for the plugin. base_url = "nautobot_porthistory_plugin" # (Optional) Base path to use for plugin URLs. Defaulting to app_name. required_settings = [] # A list of any configuration parameters that must be defined by the user. min_version = "1.0.0" # Minimum version of Nautobot with which the plugin is compatible. max_version = "1.999" # Maximum version of Nautobot with which the plugin is compatible. default_settings = {} # A dictionary of configuration parameters and their default values. caching_config = {} # Plugin-specific cache configuration. # А вот и нужная нам конфигурация required_settings = ['switches_role_slug'] default_settings = { 'min_idle_days': 14, 'snmp_community': 'public', 'workers': 50, } config = NautobotPorthistoryPluginConfig
Параметры, которые будем передавать в плагин:
-
switches_role_slug
— Роль, по которой будем фильтровать коммутаторы -
min_idle_days
— Если порт не используется меньше дней, он нас не интересует -
workers
— количество параллельных асинхронных запросов к оборудованию
Модель определим в models.py. В таблице UnusedPorts будем хранить время обновления, время последнего использования интерфейса и собственно ID самого интерфейса. Как видите, пока вообще ничего сложного.
models.py """Model definition for nautobot_porthistory_plugin.""" from django.db import models from nautobot.core.models import BaseModel from nautobot.dcim.fields import MACAddressCharField class UnusedPorts(BaseModel): # Дата/время последнего output на порту коммутатора updated = models.DateTimeField(auto_now=True) last_output = models.DateTimeField() interface = models.ForeignKey( to="dcim.Interface", on_delete=models.CASCADE, blank=False, ) def __str__(self): return f'{self.interface.name} - {self.last_output}'
Аргумент auto_now=True
указывает, что при каждом сохранении данных в таблицу поле updated автоматически обновляется до текущего времени
Файлы navigation.py, urls.py и views.py пока нам не нужны, оставим их в первозданном виде. Эти три файла понадобятся на следующем этапе, когда мы сделаем отдельную ссылку на плагин в меню. Но, как обычно, есть нюанс. Импорты в views.py “из коробки” ссылаются на несуществующие модули django, поэтому чтобы не словить исключение при установке, закомментим строчку from django.views.generic import views
Итак, у нас в БД есть таблица, но в ней нет данных. Создадим джоб (job в Nautobot — как custom script и report в Netbox), который соберет информацию с коммутаторов, обработает ее и сохранит в нашей таблице.
Добавим файл jobs.py с классом (джобом) UnusedPortsUpdate
Логика его работы проста:
-
Сгенерировать список коммутаторов — используем фильтр по device_role.slug = switches_role_slug из конфига
-
Асинхронно опросить все коммутаторы по SNMP — uptime устройства
-
Асинхронно опросить все коммутаторы по SNMP — соответствие ifindex и названий портов
-
Асинхронно опросить все коммутаторы по SNMP — Last output на портах
-
Если Last output отрицательный смотрим, есть ли в таблице информация по данному порту. Если отсутствует — добавляем дату последнего использования равной дате загрузке (п.2). Если данные есть, просто обновляем дату проверки
-
Если Last output, переведенный в дни, меньше, чем min_idle_days из конфига, удаляем запись о порте из таблицы. Иначе создаем/обновляем запись.
На всякий случай в необязательные входные параметры добавим Site, чтобы иметь возможность обновлять информацию по конкретному филиалу.
Код jobs.py
jobs.py from nautobot.dcim.models import Device, DeviceRole, Site, Interface from nautobot.extras.jobs import Job, ObjectVar from nautobot.extras.models import Status from django.conf import settings from nautobot_porthistory_plugin.models import UnusedPorts import asyncio import aiosnmp from collections import defaultdict from netutils.interface import canonical_interface_name from datetime import datetime, timedelta class UnusedPortsUpdate(Job): class Meta: name = "Обновление информации о неподключенных интерфейсах" site = ObjectVar( model=Site, label='БЮ', required=False ) async def bulk_snmp(self, device, oid_list, community): oid_results = {} try: async with aiosnmp.Snmp( host=device, port=161, community=community, timeout=5, retries=3, max_repetitions=10, ) as snmp: oid_bulk_result = {} for oid in oid_list: reply = await snmp.bulk_walk(oid) for index in reply: oid_bulk_result[index.oid] = index.value oid_results[oid] = oid_bulk_result return (device, oid_results) except Exception as error: return (device, error) return (device, None) async def bulk_snmp_with_semaphore(self, semaphore, function, *args, **kwargs): async with semaphore: return await function(*args, **kwargs) async def async_bulk_snmp(self, devices, oid_list, community, workers): semaphore = asyncio.Semaphore(workers) coroutines = [ self.bulk_snmp_with_semaphore(semaphore, self.bulk_snmp, device, oid_list, community) for device in devices ] result = [] for future in asyncio.as_completed(coroutines): result.append(await future) return result def round_datetime(self, date): date_tuple = date.timetuple() return datetime(year=date_tuple.tm_year, month=date_tuple.tm_mon, day=date_tuple.tm_mon, hour=date_tuple.tm_hour, minute=0, second=0, microsecond=0 ) def run(self, data, commit): # запускать job могут только пользователи is_superuser if not self.request.user.is_superuser: self.log_info(message='Неавторизованный запуск') return PLUGIN_CFG = settings.PLUGINS_CONFIG['nautobot_porthistory_plugin'] COMMUNITY = PLUGIN_CFG['snmp_community'] MIN_IDLE_DAYS = PLUGIN_CFG.get('min_idle_days', 14) SWITCHES_ROLE_SLUG = PLUGIN_CFG['switches_role_slug'] WORKERS = PLUGIN_CFG['workers'] STATUS_ACTIVE = Status.objects.get(slug='active') # сгенерируем справочник устройств devices = [] #этот список передадим в модуль snmp device_dict = defaultdict(dict) device_role = DeviceRole.objects.filter(slug__in=SWITCHES_ROLE_SLUG) if data['site']: nb_devices = Device.objects.filter(site=data['site'], device_role__in=device_role, status=STATUS_ACTIVE) else: nb_devices = Device.objects.filter(device_role__in=device_role, status=STATUS_ACTIVE) for nb_device in nb_devices: if nb_device.platform and nb_device.platform.napalm_driver and nb_device.platform.napalm_driver == 'cisco_iosxe' and nb_device.primary_ip4: primary_ip = str(nb_device.primary_ip4).split('/')[0] devices.append(primary_ip) device_dict[primary_ip]['device'] = nb_device device_dict[primary_ip]['interfaces'] = {} device_dict[primary_ip]['ifindexes'] = {} device_interfaces = Interface.objects.filter(device_id=nb_device) for intf in device_interfaces: device_dict[primary_ip]['interfaces'][intf.name] = [intf] # получим uptime оборудования по SNMP (в секундах) # и занесем эту информацию в справочник oid_list = ['.1.3.6.1.6.3.10.2.1.3'] results = asyncio.run(self.async_bulk_snmp(devices, oid_list, COMMUNITY, WORKERS)) for device_ip, device_result in results: if type(device_result) != dict: self.log_warning(obj=device_dict[device_ip]['device'],message=f'не удалось получить информацию по SNMP - {device_result}') continue for oid, oid_result in device_result.items(): for uptime in oid_result.values(): device_dict[device_ip]['uptime'] = uptime boottime = datetime.now() - timedelta(seconds=uptime) device_dict[device_ip]['boottime'] = boottime # получим названия интерфейсов и их индексы с оборудования по SNMP # и занесем эту информацию в справочник oid_list = ['.1.3.6.1.2.1.31.1.1.1.1'] results = asyncio.run(self.async_bulk_snmp(devices, oid_list, COMMUNITY, WORKERS)) for device_ip, device_result in results: if type(device_result) != dict or 'uptime' not in device_dict[device_ip]: continue for oid, oid_result in device_result.items(): for index, index_result in oid_result.items(): ifindex = index.split('.')[-1] canonical_intf_name = canonical_interface_name(index_result.decode("utf-8")) if canonical_intf_name in device_dict[device_ip]['interfaces']: device_dict[device_ip]['ifindexes'][ifindex] = canonical_intf_name # получим время последнего output по SNMP oid_list = ['.1.3.6.1.4.1.9.2.2.1.1.4'] results = asyncio.run(self.async_bulk_snmp(devices, oid_list, COMMUNITY, WORKERS)) output = '' for device_ip, device_result in results: if type(device_result) != dict or 'uptime' not in device_dict[device_ip]: continue nb_device = device_dict[device_ip]['device'] boottime = device_dict[device_ip]['boottime'] uptime = device_dict[device_ip]['uptime'] output += f'{nb_device.name} - power on {boottime}\n' unused_port_count = 0 for oid, oid_result in device_result.items(): for index, time_from_last_output in oid_result.items(): ifindex = index.split('.')[-1] if ifindex in device_dict[device_ip]['ifindexes']: intf_name = device_dict[device_ip]['ifindexes'][ifindex] nb_interface = device_dict[device_ip]['interfaces'][intf_name][0] if time_from_last_output < 0 or time_from_last_output / 1000 > uptime - 300: unused_port_count += 1 unused_port, created = UnusedPorts.objects.get_or_create( interface=nb_interface, defaults={ 'last_output': boottime } ) unused_port.save() else: last_output = datetime.now() - timedelta(seconds=round(time_from_last_output/1000)) if 1000 * 60 * 60 * 24 * MIN_IDLE_DAYS > time_from_last_output: # прошло меньше MIN_IDLE_DAYS дней UnusedPorts.objects.filter(interface=nb_interface).delete() else: unused_port_count += 1 unused_port, created = UnusedPorts.objects.get_or_create( interface=nb_interface, defaults={ 'last_output': last_output } ) if not created: unused_port.last_output = last_output unused_port.save() output += f'неиспользуемых в течении {MIN_IDLE_DAYS} дн. портов - {unused_port_count}\n' return output jobs = [UnusedPortsUpdate]
В дальнейшем можно будет запланировать регулярный запуск джоба, чтобы всегда иметь актуальный список неиспользуемых портов. Напомню, что в Nautobot для этого есть встроенный шедулер джобов. Всего лишь надо при запуске джоба указать периодичность.
Теперь научим плагин выводить полученные данные на странице коммутатора. Для этого добавим в корень плагина файлы template_content.py и templates/unused_ports.html
Названия говорят сами за себя — подготовить содержимое шаблона и отрендерить блок HTML. Отрендеренный блок разместим на странице Device, в правой части.
template_content.py from nautobot.extras.plugins import PluginTemplateExtension from django.conf import settings from .models import UnusedPorts class DeviceUnusedPorts(PluginTemplateExtension): """Template extension to display unused ports on the right side of the page.""" model = 'dcim.device' def right_page(self): PLUGIN_CFG = settings.PLUGINS_CONFIG['nautobot_porthistory_plugin'] SWITCHES_ROLE_SLUG = PLUGIN_CFG['switches_role_slug'] MIN_IDLE_DAYS = PLUGIN_CFG.get('min_idle_days', 14) device = self.context['object'] if device.device_role.slug in SWITCHES_ROLE_SLUG: device_intefaces = device.interfaces.values_list("id", flat=True) unused_ports = UnusedPorts.objects.filter(interface_id__in=device_intefaces) unused_ports_with_delta = [] for port in unused_ports: unused_ports_with_delta.append({ 'interface_name': port.interface.name, 'last_output': port.last_output.strftime("%d.%m.%Y %H:%M"), 'updated': port.updated.strftime("%d.%m.%Y %H:%M"), 'delta': str(port.updated - port.last_output).split()[0] }) return self.render('unused_ ports.html', extra_context={ 'unused_ports': unused_ports_with_delta, 'min_idle_days': MIN_IDLE_DAYS }) else: return '' template_extensions = [DeviceUnusedPorts]
templates/unused_ports.html <div class="panel panel-default"> <div class="panel-heading"><strong>Неиспользуемые порты (в течение последних {{ min_idle_days }} дн.)</strong></div> <table class="table table-hover panel-body attr-table"> <tr> <td>Порт</td> <td>Время последней активности</td> <td>Обновлено (UTC)</td> </tr> {% for item in unused_ports %} <tr> <td>{{ item.interface_name }}</td> <td>{{ item.last_output }} (прошло ~{{ item.delta }} дн.)</td> <td>{{ item.updated }}</td> </tr> {% endfor %} </table> </div>
Осталось смешать все ингредиенты и попробовать, что получилось:
-
Создадим миграции — стандартный механизм Django для обеспечения версионности БД в обертке Nautobot.
nautobot-server makemigrations
(выполняется из папки плагина) -
добавим файлы setup.py и MANIFEST.in
-
установим плагин простым
pip3 install .
из каталога плагина -
включим плагин в конфигурационном файле nautobot и там же укажем параметры плагина
PLUGINS = [
'nautobot_porthistory_plugin',
]
PLUGINS_CONFIG = {
'nautobot_porthistory_plugin': {
'switches_role_slug': ['Access-switch'],
'min_idle_days': 14,
}
} -
запустим пост-инсталляционный скрипт
nautobot-server post_upgrade
. Он обновит БД и скопирует HTML файлы в соответствующую папку. -
Перезапустим сервисы:
sudo systemctl restart nautobot nautobot-worker
Готово! Можно запустить джоб и проверить результаты его выполнения:
Полный код плагина, созданный на этом этапе, можно подсмотреть по ссылке — https://github.com/iontzev/nautobot-porthistory-plugin/tree/part_1
Этап второй — прокачка плагина
Вторая часть реализации будет посложнее. Тем интереснее, ведь это позволит понять, что еще могут плагины в Nautobot. Сперва добавим еще одну модель в БД — таблицу, в которой будем хранить MAC и IP адреса с привязкой к интерфейсу. Способ уже знакомый — обновить models.py и создать новую миграцию nautobot-server makemigrations
from nautobot.dcim.fields import MACAddressCharField class MAConPorts(BaseModel): # MAC и IP на порту коммутатора updated = models.DateTimeField(auto_now=True) mac = MACAddressCharField(blank=False, verbose_name="MAC Address") vlan = models.ForeignKey( to="ipam.VLAN", on_delete=models.CASCADE, blank=False, ) ipaddress = models.ForeignKey( to="ipam.IPAddress", on_delete=models.SET_NULL, default=None, blank=True, null=True, ) interface = models.ForeignKey( to="dcim.Interface", on_delete=models.CASCADE, blank=False, ) device = models.ForeignKey( to="dcim.Device", on_delete=models.CASCADE, blank=False, ) def __str__(self): return f'{self.intervace} - VLAN {seld.vlan.vid} MAC {self.mac}' class Meta: verbose_name_plural = 'MAC and IP on switches ports'
В jobs.py добавим еще один класс — он же джоб. Алгоритм его работы относительно несложный:
-
асинхронно опросить по SNMP все устройства с ролью device_role.slug = switches_role_slug из конфига — получить mac address table и соответствия MAC именам портов.
-
асинхронно опросить по SNMP все устройства с ролью device_role.slug = routers_role_slug из конфига — получить ARP таблицу. Не забудем, что конфиг тоже надо подправить — он находится в __init__.py
-
попробовать получить hostname по IP
-
если IP адрес не внесен в Nautobot, добавить
-
обновить информацию о привязке MAC и IP к интерфейсам устройств
Обновление построено таким образом, что если MAC на порту пропадет физически, мы сможем его найти в истории подключений, если на этом порту коммутатора больше ничего не появлялось.
Новый джоб в jobs.py
class MAConPortsUpdate(Job): class Meta: name = "Обновление информации о подключенных устройствах" site = ObjectVar( model=Site, label='БЮ', required=False ) async def bulk_snmp(self, device, oid_list, community): oid_results = {} try: async with aiosnmp.Snmp( host=device, port=161, community=community, timeout=5, retries=3, max_repetitions=10, ) as snmp: oid_bulk_result = {} for oid in oid_list: reply = await snmp.bulk_walk(oid) for index in reply: oid_bulk_result[index.oid] = index.value oid_results[oid] = oid_bulk_result return (device, oid_results) except Exception as error: return (device, error) return (device, None) async def bulk_snmp_with_semaphore(self, semaphore, function, *args, **kwargs): async with semaphore: return await function(*args, **kwargs) async def async_bulk_snmp(self, devices, oid_list, community, workers): semaphore = asyncio.Semaphore(workers) coroutines = [ self.bulk_snmp_with_semaphore(semaphore, self.bulk_snmp, device, oid_list, community) for device in devices ] result = [] for future in asyncio.as_completed(coroutines): result.append(await future) return result def run(self, data, commit): # запускать job могут только пользователи is_superuser if not self.request.user.is_superuser: self.log_info(message='Неавторизованный запуск') return PLUGIN_CFG = settings.PLUGINS_CONFIG['nautobot_porthistory_plugin'] COMMUNITY = PLUGIN_CFG['snmp_community'] SWITCHES_ROLE_SLUG = PLUGIN_CFG['switches_role_slug'] ROUTERS_ROLE_SLUG = PLUGIN_CFG['routers_role_slug'] WORKERS = PLUGIN_CFG['workers'] STATUS_ACTIVE = Status.objects.get(slug='active') STATUS_STATIC = Status.objects.get(slug='static') STATUS_DHCP = Status.objects.get(slug='dhcp') device_role = DeviceRole.objects.filter(slug__in=SWITCHES_ROLE_SLUG) devices = defaultdict(dict) devices_list = [] vlans = defaultdict(list) # построим список всех связей, чтобы потом исключить из результатов линки между свичами cable_set = defaultdict(set) all_cables = Cable.objects.all() for cable in all_cables: if cable.termination_a_type == ContentType.objects.get(app_label='dcim', model='interface'): if not data['site'] or cable.termination_a.device.site == data['site']: cable_set[cable.termination_a.device.name].add(cable.termination_a.name) if cable.termination_b_type == ContentType.objects.get(app_label='dcim', model='interface'): if not data['site'] or cable.termination_b.device.site == data['site']: cable_set[cable.termination_b.device.name].add(cable.termination_b.name) # сгенерируем справочник вланов с разбивкой по сайтам vlans_by_site = defaultdict(list) if data['site']: nb_vlans = VLAN.objects.filter(site=data['site'], status=STATUS_ACTIVE, _custom_field_data={'flag-porthistory':True}) else: nb_vlans = VLAN.objects.filter(status=STATUS_ACTIVE, _custom_field_data={'flag-porthistory':True}) for nb_vlan in nb_vlans: vlans_by_site[nb_vlan.site.name].append(nb_vlan.vid) # сгенерируем справочник устройств for site in vlans_by_site: site_id = Site.objects.get(name=site) nb_devices_in_site = Device.objects.filter( site=site_id, device_role__in=device_role, status=STATUS_ACTIVE, ) for nb_device in nb_devices_in_site: if (nb_device.platform and nb_device.platform.napalm_driver and nb_device.platform.napalm_driver == 'cisco_iosxe' and nb_device.primary_ip4): primary_ip = str(nb_device.primary_ip4).split('/')[0] devices_list.append(primary_ip) device = devices[primary_ip] = {} device['device'] = nb_device device['site'] = nb_device.site device['interfaces'] = {} device['ifindexes'] = {} device['bridge_ports'] = {} device['vlans'] = vlans_by_site[site] for intf in Interface.objects.filter(device_id=nb_device): device['interfaces'][intf.name] = intf for vlan in vlans_by_site[site]: vlans[vlan].append(primary_ip) # получим названия интерфейсов и их индексы с оборудования по SNMP oid_list = ['.1.3.6.1.2.1.31.1.1.1.1'] results = asyncio.run(self.async_bulk_snmp(devices_list, oid_list, COMMUNITY, WORKERS)) for device_ip, device_result in results: if type(device_result) != dict: self.log_warning(obj=devices[device_ip]['device'],message=f'не удалось получить информацию по SNMP') del devices[device_ip] devices_list.remove(device_ip) continue for oid, oid_result in device_result.items(): for index, index_result in oid_result.items(): ifindex = index.split('.')[-1] canonical_intf_name = canonical_interface_name(index_result.decode("utf-8")) if canonical_intf_name in devices[device_ip]['interfaces']: devices[device_ip]['ifindexes'][ifindex] = canonical_intf_name # пройдемся по списку вланов и получим с устройства таблицу MAC адресов для каждого влана # MAC адреса в десятичном формате port_mac_relation = defaultdict(list) for vlan, devices_dict in vlans.items(): self.log_info(message=f'Получаем информацию по VLAN {vlan}') community_with_vlan = f'{COMMUNITY}@{vlan}' devices_list = [device for device in devices_dict if device in devices_list] # получим bridge ports с оборудования по SNMP (зависит от VLAN) oid_list = ['.1.3.6.1.2.1.17.1.4.1.2'] results = asyncio.run(self.async_bulk_snmp(devices_list, oid_list, community_with_vlan, WORKERS)) for device_ip, device_result in results: if type(device_result) != dict: # скорее всего, такого VLAN нет на этом устройстве continue for oid, oid_result in device_result.items(): for index, index_result in oid_result.items(): bridge_port = index.split('.')[-1] ifindex = str(index_result) if ifindex in devices[device_ip]['ifindexes']: ifname = devices[device_ip]['ifindexes'][ifindex] nb_interface = devices[device_ip]['interfaces'][ifname] devices[device_ip]['bridge_ports'][bridge_port] = nb_interface else: oid_list = ['.1.3.6.1.2.1.17.4.3.1.2'] results = asyncio.run(self.async_bulk_snmp(devices_list, oid_list, community_with_vlan, WORKERS)) for device_ip, device_result in results: nb_device = devices[device_ip]['device'] nb_vlan = VLAN.objects.get(vid=vlan, site_id=nb_device.site.id) if type(device_result) != dict: continue for oid, oid_result in device_result.items(): for mac_dec, bridge_port in oid_result.items(): if str(bridge_port) in devices[device_ip]['bridge_ports']: if (devices[device_ip]['bridge_ports'][str(bridge_port)].name not in cable_set[nb_device.name] and not devices[device_ip]['bridge_ports'][str(bridge_port)]._custom_field_data.get('flag-ignore-mac')): # преобразуем MAC из десятичного формата в шестнадцатеричный mac_hex = ''.join(['{0:x}'.format(int(i)).zfill(2) for i in mac_dec.split('.')[-6:]]).upper() port_mac_relation[devices[device_ip]['bridge_ports'][str(bridge_port)].id].append({ 'vlan': nb_vlan, 'mac': mac_hex, }) # подготовим список L3 устройств routers = defaultdict(dict) routers_list = [] device_role = DeviceRole.objects.filter(slug__in=ROUTERS_ROLE_SLUG) for site in vlans_by_site: site_id = Site.objects.get(name=site) nb_devices_in_site = Device.objects.filter( site=site_id, device_role__in=device_role, status=STATUS_ACTIVE, ) for nb_device in nb_devices_in_site: if (nb_device.platform and nb_device.platform.napalm_driver and nb_device.platform.napalm_driver == 'cisco_iosxe' and nb_device.primary_ip4): primary_ip = str(nb_device.primary_ip4).split('/')[0] routers_list.append(primary_ip) router = routers[primary_ip] = {} router['site'] = nb_device.site.name router['device'] = nb_device arp = defaultdict(dict) # получим ARP-таблицу с оборудования по SNMP oid_list = ['.1.3.6.1.2.1.3.1.1.2'] results = asyncio.run(self.async_bulk_snmp(routers_list, oid_list, COMMUNITY, WORKERS)) for device_ip, device_result in results: site = routers[device_ip]['site'] arp[site] = defaultdict(list) if type(device_result) != dict: self.log_warning(obj=routers[device_ip]['device'],message=f'не удалось получить информацию по SNMP') continue for oid, oid_result in device_result.items(): for index, index_result in oid_result.items(): snmp_address = '.'.join(index.split('.')[-4:]) snmp_mac = ''.join(["{0:x}".format(int(i)).zfill(2) for i in index_result]).upper() arp[site][snmp_mac].append(snmp_address) output = '' for device in devices.values(): nb_device = device['device'] site = nb_device.site.name output += f'device {nb_device} :' mac_on_device = ip_on_device = name_on_device = 0 for intf in device['interfaces'].values(): if len(port_mac_relation[intf.id]) > 0: MAConPorts.objects.filter(interface=intf).delete() for vlan_and_mac in port_mac_relation[intf.id]: mac_on_device += 1 nb_prefixes = Prefix.objects.filter(vlan_id=vlan_and_mac['vlan'].id) addresses = arp[site].get(vlan_and_mac['mac']) address_with_prefix = '' if nb_prefixes and addresses: for nb_prefix in nb_prefixes: for address in addresses: if IPv4Address(address) in IPv4Network(str(nb_prefix)): prefixlen = str(nb_prefix).split('/')[-1] address_with_prefix = f'{address}/{prefixlen}' break else: continue break if address_with_prefix: ip_on_device += 1 try: hostname, aliaslist, ipaddrlist = socket.gethostbyaddr(address) name_on_device += 1 except: hostname='' nb_address, created = IPAddress.objects.get_or_create( address=address_with_prefix, vrf=nb_prefix.vrf, defaults={ 'status': STATUS_STATIC, 'dns_name': hostname } ) if created: self.log_success(obj=nb_address, message=f'Добавлен IP адрес {hostname}') elif nb_address.status != STATUS_DHCP and hostname and nb_address.dns_name != hostname: old_hostname = nb_address.dns_name nb_address.dns_name = hostname nb_address.save() self.log_success(obj=nb_address, message=f'Обновлено DNS name "{old_hostname}" -> "{hostname}"') else: nb_address = None mac, created = MAConPorts.objects.get_or_create( vlan=vlan_and_mac['vlan'], mac=vlan_and_mac['mac'], defaults={ 'interface': intf, 'device': nb_device, 'ipaddress': nb_address, } ) if not created: updated = False if nb_address and mac.ipaddress != nb_address: self.log_info(obj=nb_address, message=f'Устройство с MAC {mac.mac} поменяло IP {mac.ipaddress} -> {nb_address}') mac.ipaddress = nb_address updated = True if mac.interface != intf: self.log_info(obj=intf, message=f'MAC {mac.mac} переехал с порта "{mac.interface}"') mac.interface = intf mac.device = nb_device updated = True if updated: mac.save() output += f" MAC count - {mac_on_device}, IP count - {ip_on_device}, resolved to hostname - {name_on_device}\n" return output
Важно! Особенность получения по SNMP таблицы MAC с коммутаторов Cisco в том, что необходимо указывать в строке community номер VLAN, в котором живут MAC адреса.
Еще учтем, что не про все VLAN надо знать. Например, если в каком-то VLAN расположены беспроводные клиенты, в них бессмысленно отслеживать MAC. То же самое касается портов. Значит нам нужен механизм фильтрации VLAN и портов коммутаторов. Сюда отлично подойдут custom_fields, но использовать в скриптах названия полей, которые надо добавить пользователю, значит надеяться на маленькое чудо.
Так давайте будем реалистами, добавим custom_fileds при установке плагина. Поможет нам в этом функционал Django signals, опять же в обертке Nautobot.
Добавим вызов сигнала в __init__.py, а сам сигнал в signals.py.
__init__.py class NautobotPorthistoryPluginConfig(PluginConfig): ............ def ready(self): super().ready() nautobot_database_ready.connect(create_custom_fields_for_porthistory, sender=self)
signals.py from nautobot.extras.choices import CustomFieldTypeChoices def create_custom_fields_for_porthistory(sender, apps, **kwargs): """Create a custom field flag_porthistory for VLAN if it doesn't already exist.""" # Use apps.get_model to look up Nautobot core models ContentType = apps.get_model("contenttypes", "ContentType") CustomField = apps.get_model("extras", "CustomField") VLAN = apps.get_model("ipam", "VLAN") Interface = apps.get_model("dcim", "Interface") # Create custom fields cf_for_vlan, created = CustomField.objects.update_or_create( name="flag-porthistory", defaults={ "label": "Search MACs on ports in this VLAN", "type": CustomFieldTypeChoices.TYPE_BOOLEAN, }, ) cf_for_vlan.content_types.set([ContentType.objects.get_for_model(VLAN)]) cf_for_interface, created = CustomField.objects.update_or_create( name="flag-ignore-mac", defaults={ "label": "Ignore MACs on this port", "type": CustomFieldTypeChoices.TYPE_BOOLEAN, }, ) cf_for_interface.content_types.set([ContentType.objects.get_for_model(Interface)])
Теперь при установке плагина автоматически добавятся новые поля к моделям VLAN и Device.interface. Отметив на странице редактирования VLAN галочку Search MACs on ports in this VLAN мы тем самым разрешаем нашему джобу искать MAC адреса в этом VLAN.
Итак, у нас есть джоб и настроенные VLAN, а значит можно заполнять БД нужными данными. Но как посмотреть эти данные? Первый путь нам уже знаком — используем шаблоны для страниц интерфейса и IP-адреса. Таким образом можно точечно извлекать информацию из БД и показывать пользователю:
Второй путь — создать отдельную страницу плагина с возможностью вывода и поиска полученных данных. Этот путь более тернист, чем первый, но и результаты более интересны.
В подготовке страницы плагина участвуют 6 составляющих. Число их можно сократить, если использовать HTML-шаблоны, но это немного другая история
Итак, по порядку:
-
navigation.py — добавляет строку в выпадающее меню Plugins
-
urls.py — вешает на url (добавленной строки в п.1) обработчик
-
views.py — содержит обработчики (View). Обработчик собирает в одно целое данные, параметры вывода (таблицы), форму и алгоритмы фильтрации
-
tables.py — выводит таблицу из БД
-
forms.py — формирует HTML форму для фильтрации данных в таблице
-
filters.py — содержит алгоритмы фильтрации данных
navigation.py from nautobot.extras.plugins import PluginMenuItem menu_items = ( PluginMenuItem( link = 'plugins:nautobot_porthistory_plugin:history', # A reverse compatible link to follow. link_text = 'MAC and IP on switches ports', # Text to display to user. ), )
urls.py from django.urls import path from nautobot_porthistory_plugin import views urlpatterns = [ path('history/', views.PortHistoryView.as_view(), name='history'), ]
views.py from django.shortcuts import render from nautobot.core.views import generic from nautobot_porthistory_plugin import models, tables, filters, forms class PortHistoryView(generic.ObjectListView): """Показывает MAC и IP адреса на портах""" queryset = models.MAConPorts.objects.all() table = tables.PortHistoryTable filterset = filters.PortHistoryFilterSet filterset_form = forms.PortHistoryFilterForm action_buttons = ()
tables.py import django_tables2 as tables from django_tables2.utils import A from nautobot.utilities.tables import BaseTable, ToggleColumn from nautobot_porthistory_plugin import models class PortHistoryTable(BaseTable): pk = ToggleColumn() device = tables.Column(linkify=True) interface = tables.LinkColumn(orderable=False) vlan = tables.LinkColumn() ipaddress = tables.Column(linkify=True, verbose_name="IPv4 Address") class Meta(BaseTable.Meta): # pylint: disable=too-few-public-methods """Meta attributes.""" model = models.MAConPorts fields = ( 'pk', 'device', 'interface', 'vlan', 'mac', 'ipaddress', 'updated', )
forms.py from django import forms from nautobot.dcim.models import Region, Site, Device from nautobot.ipam.models import VLAN from nautobot.utilities.forms import BootstrapMixin, DynamicModelMultipleChoiceField from nautobot.extras.forms import CustomFieldFilterForm from nautobot_porthistory_plugin.models import MAConPorts class PortHistoryFilterForm(BootstrapMixin, forms.Form): """Filter form to filter searches for MAC.""" model = MAConPorts field_order = ["q", "site", "device_id", "vlan"] q = forms.CharField(required=False, label="Search MAC") site = DynamicModelMultipleChoiceField( queryset=Site.objects.all(), to_field_name="slug", required=False, ) device_id = DynamicModelMultipleChoiceField( queryset=Device.objects.all(), required=False, label="Device", query_params={"site": "$site"}, ) vlan = DynamicModelMultipleChoiceField( queryset=VLAN.objects.all(), required=False, label="VLAN", query_params={"site": "$site"}, )
filters.py import django_filters from nautobot.dcim.models import Device from nautobot.utilities.filters import BaseFilterSet, MultiValueCharFilter from django.db.models import Q from nautobot_porthistory_plugin.models import MAConPorts class PortHistoryFilterSet(BaseFilterSet): """Filter for MAConPorts""" q = django_filters.CharFilter(method="search", label="Search MAC") site = MultiValueCharFilter( method="filter_site", field_name="pk", label="site", ) device_id = MultiValueCharFilter( method="filter_device_id", field_name="pk", label="Device (ID)", ) vlan = MultiValueCharFilter( method="filter_vlan", field_name="pk", label="VLAN", ) class Meta: """Meta attributes for filter.""" model = MAConPorts fields = [ 'vlan' ] def search(self, queryset, mac, value): if not value.strip(): return queryset mac = ''.join(ch for ch in value if ch.isalnum()) mac = ':'.join(mac[i:i+2] for i in range(0,len(mac),2)) return queryset.filter(Q(mac__icontains=mac)) def filter_site(self, queryset, name, id_list): if not id_list: return queryset return queryset.filter(Q(device__site__slug__in=id_list) ) def filter_device_id(self, queryset, name, id_list): if not id_list: return queryset return queryset.filter(Q(device__id__in=id_list) ) def filter_vlan(self, queryset, name, id_list): if not id_list: return queryset return queryset.filter(Q(vlan__id__in=id_list) )
Вот такая получилась страница плагина. Можно фильтровать по сайту, по устройству, по VLAN. Можно искать по части MAC, причем формат ввода в поле поиска абсолютно не важен, хоть через нижние подчеркивания, хоть капсом через одну.
Какие выводы я для себя сделал
Написать свой плагин не так и сложно, как мне казалось в самом начале, когда я только изучал тему Netbox и Nautobot. Nautobot/Netbox представляют собой отличную платформу для автоматизации. Прикладывая минимум усилий можно достичь отличных результатов в деле адаптации Nautobot/Netbox к реалиям своей организации.
На этом всё, спасибо за внимание.
Ну и для тех, кому эта тема действительно интересна и кто дочитал до конца, ссылка на код плагина — https://github.com/iontzev/nautobot-porthistory-plugin
ссылка на оригинал статьи https://habr.com/ru/post/661281/
Добавить комментарий