Сканер уязвимостей на Python или как написать сканер за 6 часов

Недавно мне довелось участвовать в хакатоне по информационной безопасности на научной конференции в прекрасном городе Санкт-Петербург в СПбГУТ. Одно из заданий представляло из себя написание собственного сканера уязвимостей на любом ЯП с условиями, что использование проприетарного ПО и фреймворков запрещено. Можно было пользоваться кодом и фреймворками существующих сканеров уязвимости с открытым кодом. Это задание и мое решение с моим коллегой мы и разберем в этой публикации.

Подготовительный этап

Так как нам дали на это задание буквально два дня, а эти дни были загружены до отвала, то решение писать данный сканер было принято на последнюю ночь перед сдачей отчета с исходным кодом (в принципе, как это всегда и делается). Решено было использовать для скана портов утилиту с открытым исходным кодом nmap, которая входит в kali. С помощью флага -sV мы узнали какие сервисы и их версии запущены на каждом порте хоста.

    nm = nmap.PortScanner()     # Настроить параметры сканирования nmap     scan_raw_result = nm.scan(hosts=network_prefix, arguments='-v -n -A')

Далее пришлось написать небольшой скрипт, чтобы он парсил эти названия сервисов с версиями. На выходе получаем списки на каждый порт у каждого хоста.

for host, result in scan_raw_result['scan'].items():         if result['status']['state'] == 'up':             print('#' * 17 + 'Host:' + host + '#' * 17)             idno = 1             for port in result['tcp']:                     print('-' * 17 + "Детали TCP-сервера" + '[' + str(idno) + ']' + '-' * 17)                 idno += 1                 print('Номер порта TCP:' + str(port))                                  print('положение дел:' + result['tcp'][port]['state'])                                  print('причина:' + result['tcp'][port]['reason'])                                  print('Дополнительная информация:' + result['tcp'][port]['extrainfo'])                                  print('Имя:' + result['tcp'][port]['name'])                                  cur_ver = result['tcp'][port]['version']                 print('версия:' + result['tcp'][port]['version'])                                  print('сервис:' + result['tcp'][port]['product'])                 cur_soft_title = result['tcp'][port]['product']                 if ' ' in cur_soft_title:                     cur_soft_title = cur_soft_title.split()[0].lower()                 if ('windows' in cur_soft_title) or ('linux' in cur_soft_title) or ('microsoft' in cur_soft_title):                     cur_soft_title = None                 print('3 '+cur_soft_title)                                  print('CPE:' + result['tcp'][port]['cpe'])                                  print("Сценарий:" + result['tcp'][port]['script'])                  if cur_ver != '' and cur_soft_title != '':                     os.system('python nist_scanner.py -s {} {}'.format(str(cur_soft_title), str(cur_ver)))                 if cur_ver and cur_soft_title:                     BDU_check(cur_soft_title, cur_ver)              idno = 1             for port in result['udp']:                 print('-' * 17 + "Детали сервера UDP" + '[' + str(idno) + ']' + '-' * 17)                 idno += 1                 print('Номер порта UDP:' + str(port))                 print('state:' + result['udp'][port]['state'])                 print('reason:' + result['udp'][port]['reason'])                 print('Дополнительная информация:' + result['udp'][port]['extrainfo'])                 print('Имя:' + result['udp'][port]['name'])                 print('версия:' + result['udp'][port]['version'])                 cur_ver =result['udp'][port]['version']                 cur_soft_title = result['udp'][port]['product']                 print('сервис:' + cur_soft_title)                 if ' ' in cur_soft_title:                     cur_soft_title = cur_soft_title.split()[0].lower()                 if 'windows' in cur_soft_title or 'linux' in cur_soft_title :                     cur_soft_title = None                 print('CPE:' + result['udp'][port]['cpe'])                 print("script:" + result['udp'][port]['script'])                 if cur_ver != '' and cur_soft_title != '':                     os.system('python nist_scanner.py -s {} {}'.format(str(cur_soft_title), str(cur_ver)))                 if cur_ver and cur_soft_title:                     BDU_check(cur_soft_title, cur_ver)

Шаблон: https://russianblogs.com/article/7503575156/

А как быть дальше?

Многие на этом моменте и остались, лишь научившись вызывать nmap из питона и выдавать красивый вывод строками. Конечно, кто-то пользовался скриптом для nmap, таким как Vulscan, но тоже не увенчалось успехом, так как (как показалось мне) данный скрипт выводит любое упоминание данного сервиса в описании к CVE, независимо от версии данного сервиса.

Подумав, мы решили, что развернуть зеркала баз уязвимостей для дальнейшего парсинга — это хорошая идея (как в конце и оказалось). Скачали json NIST (https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-{YEAR}.json.gz, где вместо {YEAR} подставляется год из диапазона 2002-наш год) и БДУ ФСТЭК (https://bdu.fstec.ru/files/documents/vullist.xlsx). За основу был взят парсер CVE, где выводится все CVE, в которых упоминается строчка, которую ты укажешь (https://github.com/stratosphereips/nist-cve-search-tool). Переписав полностью под себя этот код, я столкнулся с проблемой, что мне нужно точно определять подходит ли мне данная CVE или нет, в том числе и по версии самого сервиса. Я порылся в зеркале NIST’а в json-файлах и обнаружил, что у каждой cve есть ключ «cpe23Uri», где всегда точно можно явно найти название сервиса, про которое идет речь в данной CVE, а также «versionEndExcluding» и «versionStartExcluding», где написаны с какой версии данная уязвимость и по какую соответственно.

Скрин одного из cve по osquery
Скрин одного из cve по osquery

Это и решено было парсить.

def search(j, s, v): # j-json, s-name of service, v-verison of service     i = 0     regex = re.compile(f'({s})', re.I)     for entry in j['CVE_Items']:         if 'cve' in entry:             desc = entry['configurations']['nodes']             for d in desc:                 for cpe in d['cpe_match']:                     if regex.search(cpe['cpe23Uri']) != None:                         if 'versionEndExcluding' in cpe and version.parse(v) < version.parse(cpe['versionEndExcluding']):                             if 'versionStartExcluding' in cpe and version.parse(v) > version.parse(cpe['versionStartExcluding']):                                 CVEs.append(entry)                                 i += 1                                 break                             else:                                 CVEs.append(entry)                                 i += 1                                 break         if i == count:             break

Пока я занимался парсингом json, мой коллега занимался парсингом csv-файла уязвимостей от ФСТЭК, чтобы на выходе были CVE как от БДУ ФСТЭК, так и от NIST.

Зайдя на сайт БДУ ФСТЭК находим возможность скачать данные в виде xlsx файла. Для удобства дальнейшей работы и парсинга переформатируем файл в формат csv. И используя уже имеющиеся библиотеки для работы с csv, разбираем БДУ и настраиваем логику, чтобы выводились верные потенциальные уязвимости.

def BDU_check(cur_soft_title, cur_ver):     with open('vullist_1.csv', encoding='utf-8') as csvfile:         # print(123)         reader = csv.DictReader(csvfile)         i = 0         for row in reader:             soft_title = str(row['Название ПО'])             versions = row['Версия ПО']             if cur_soft_title.lower() in soft_title.lower():                 cve_row = row['Идентификаторы других систем описаний уязвимости']                 for current_service_version in versions.split(','):                      # нижняя граница версии                     if 'от' in current_service_version:                         begin_version = re.search('[^\d.]?[\d.]+[^\d.]?', str(current_service_version)+' ')[0]                         while re.search('[\d]', begin_version[0]) is None:                             begin_version = begin_version[1:]                         while re.search("[\d]", begin_version[-1]) is None:                             begin_version = begin_version[:-1]                      if 'до' in current_service_version:                         end_version = re.search('[^\d.]?[\d.]+[^\d.]?', str(current_service_version)+ ' ')                         end_version = end_version[0]                          while re.search('[^\d]', end_version[0]):                             end_version = end_version[1:]                         while re.search('[^\d]', end_version[-1]):                             end_version = end_version[:-1]                          cur_ver = re.search('[^\d.]?[\d.]+[^\d.]?', str(cur_ver)+ ' ')                         cur_ver = cur_ver[0]                          while re.search('[^\d]', cur_ver[0]):                              end_version = end_version[1:]                         while re.search('[^\d]', cur_ver[-1]):                              cur_ver = cur_ver[:-1]                     flag_begin_vesion = (begin_version and (not end_version) and (version.parse(begin_version) <= version.parse(cur_ver)))                     flag_end_vesion = ((not begin_version) and (end_version) and (version.parse(cur_ver) <= version.parse(end_version)))                     flag_both_vesion = (begin_version and (end_version) and (version.parse(begin_version) <= version.parse(cur_ver)) and (version.parse(cur_ver) <= version.parse(end_version)))                                          if flag_begin_vesion or flag_end_vesion or flag_both_vesion:                         print('Идентификатор         : ' + str(row['Идентификатор']))                         print('CVE                   : ' + str(cve_row))                         print('Название ПО             : '+ str(row['Название ПО']))                         print('Версия ПО             : '+ str(row['Версия ПО']))                         print('Версия ПО общ. признак: '+ str(current_service_version))                         print('Описание уязвимости   : ' + str(row['Описание уязвимости']))                         print('----------------------------\n')                         break             i += 1

Логика проверки версии ПО уязвимости для найденного сервиса была не идеальна, но благодаря этому отбросили порядка 80-90% неподходящих и неактуальных уязвимостей

В конце, когда мы соединили наши модули (не без трудностей, конечно), у нас и вышел сканер, с помощью которого мы смогли победить в хакатоне.

Ссылки:

Git-репозиторий с исходниками: https://github.com/mksmp/vulnerability_scanner

Соавтор:

https://github.com/aleksey2101

@aleksey2_1


ссылка на оригинал статьи https://habr.com/ru/post/689942/

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *