Бесплатная защита от спама на почте с помощью ИИ фильтрации без VPN: многоуровневый метод с BERT и 550 МБ RAM

от автора

Вас тоже достаёт спам и реклама? Рекламу я блокирую через свой DNS сервер и локальными CSS фильтрами, а вот для почты пришлось придумать что-то другое.

Бесплатная защита от спама на почте с помощью ИИ фильтрации

Бесплатная защита от спама на почте с помощью ИИ фильтрации

Почта у меня устроена немного необычно (я так думаю). Все мои ящики — Яндекс, Mail, старые адреса — настроены на пересылку на мой домен. С домена всё летит в Gmail. Протоколом IMAP не пользуюсь много лет (да ещё какие то платные мутки в РФ недавно придумали). А вот интерфейс Google меня полностью устраивает, поиск мгновенный, удобные метки и сортировка. Есть мелочи, которые не нравятся, но лучше варианта, который закрывал бы мои потребности, пока не встречал. В итоге получается единая точка, где видна вся переписка.

Схема многоступенчатая. Первыми срабатывают фильтры Яндекса и Mail — что-то они отсеивают сами, ещё до пересылки. То, что прошло через них, падает на мой сервер, где стоит SpamAssassin. Ловит ещё часть. Но после двух уровней всё равно что-то просачивается, спамеры же не сидят без дела. И вот этот остаток доезжает до Gmail и что-то оседает в папке Спам, а что-то попадает во входящие и приходит раздражающее уведомление. Хотелось, чтобы со временем не накапливался мусор в папках, который надо разгребать вручную. Особенно важно заблокировать то, что не является полностью спамом: приглашения на конференции, партнёрские предложения, кредиты — формально не нарушение, поэтому байесовский фильтр такие вещи плохо ловит.

Сначала думал взять LLM через API. Llama или GPT отлично разбирают текст, с ними гибко настраиваются критерии. Но это внешняя зависимость. В наше нестабильное сетевое время желательно уменьшать такие зависимости. Сервис может поменять условия, отключиться временно, геоблокировки, да и много чего ещё.

Локальная BERT-модель закрыла обе проблемы. Взял ruBert-base-antispam с HuggingFace — файн-тюн на базе DeepPavlov/rubert-base-cased-conversational. 177 миллионов параметров, 12 слоёв трансформера, 768 hidden size. Физически не принимает больше 512 токенов на вход. В памяти занимает около 550 МБ, ответ приходит за 100-200 миллисекунд. Бинарный классификатор — текст на входе, 0 или 1 на выходе, никаких промптов и reasoning. Идеально!

Встроить вызов скрипта прямо в SMTP-конвейер Exim оказалось непросто. Механизмы вроде ACL или transport_filter имеют свои особенности, особенно с мудреной панелью сервера, которая сама обновляет многие файлы. А то будет так: где-то письмо недоступно целиком, где-то прав не хватает, где-то модифицированный текст не передаётся дальше, где-то файл обновился. Пошёл другим путём — post-delivery обработкой.

Получилась трёхуровневая серверная защита. Первый уровень — pre-rules, набор регулярок для очевидного спама. Срабатывают за миллисекунду, без обращения к нейросети. Второй уровень — сама BERT-модель, обрабатывает всё, что не отсекли регулярки, это около 70% потока. Третий уровень работает параллельно — SpamAssassin со своим байесовским фильтром. Если он ставит score выше порога, Dovecot через Sieve перемещает письмо в .Spam. Это страховка на случай, если первые два уровня что-то пропустили. При этом AI-скрипт всё равно проверяет письмо: если SpamAssassin ошибся и пометил нормальное письмо, а AI считает его чистым, оно всё равно уйдёт на Gmail. Fail-open стратегия в AI-скрипте спасает от параноидального SpamAssassin и без сильных заморочек с коэффициентами.

Схема работает так. Exim принимает письмо и складывает его в Maildir, в папку new/. Отдельный скрипт каждые полминуты обходит все директории. Из MIME вытаскивает текст и заголовки — поддерживает и plain, и HTML, теги чистит стандартным HTMLParser. Сначала гонит через pre-rules, причём смотрит не только в текст, но и в заголовки. Что не отсеялось — идёт в постоянно работающий daemon через Unix-сокет. Спам перекладывается в .Spam/, нормальные письма идут в cur/ и параллельно улетают на Gmail через sendmail.

Задержка на доставку письма в 30–60 секунд меня не беспокоит. В почте я в реальном времени не сижу, это ж не чат, да и Gmail подтягивает не прям мгновенно. Почта нужна для спокойных переписок, а не для гонок кто быстрее ответил.

Привожу пример daemon (/opt/spam_filter/daemon.py):

#!/usr/bin/env python3import socket, sys, os, re, signal, loggingimport torchfrom transformers import AutoTokenizer, AutoModelForSequenceClassificationMODEL_NAME = "assskelad/ruBert-base-antispam"  # любая BertForSequenceClassificationSOCKET_PATH = "/tmp/spam_filter.sock"MAX_LEN = 512      # лимит BERTMAX_TEXT = 6000    # символы из письмаlogging.basicConfig(level=logging.INFO)logger = logging.getLogger("spam_daemon")torch.set_num_threads(1)tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME)model.eval()def predict(text: str) -> int:    inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=MAX_LEN)    with torch.inference_mode():        out = model(**inputs)        return int(torch.argmax(out.logits, dim=1).item())def handle(conn):    try:        data = conn.recv(1024 * 1024)        text = data.decode("utf-8", errors="replace")[:MAX_TEXT]        result = predict(text) if text else 0        conn.sendall(str(result).encode())    except Exception as e:        logger.error(f"Error: {e}")        conn.sendall(b"0")    finally:        conn.close()def shutdown(signum, frame):    try:        os.unlink(SOCKET_PATH)    except:        pass    sys.exit(0)signal.signal(signal.SIGTERM, shutdown)signal.signal(signal.SIGINT, shutdown)srv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)srv.bind(SOCKET_PATH)srv.listen(10)while True:    conn, _ = srv.accept()    handle(conn)

Pre-rules (/opt/spam_filter/pre_rules.py) — паттерны:

#!/usr/bin/env python3import redef check_pre_rules(text, headers=None):    text_lower = text.lower()    headers = headers or {}        # паттерны    spam_patterns = [        r'рассылк[аи]\s+по\s+баз|миллион\s+адрес',        r'внедрени.{0,15}crm|бот\s+продвижени.{0,20}(?:telegram|whatsapp)',        r'диплом.{0,20}(?:профпереподготовк|допобразован)',        r'бухгалтерск.{0,30}сопровожден|нулевой.{0,20}ндс',        r'спарсить.{0,30}базу|парсинг.{0,30}клиентов',        r'подтвердите.{0,30}данны.{0,200}предложение\s+истекает',        r'(?:[а-я]\s){4,}',        r'деньги\s+под\s+залог.{0,20}(?:авто|машин)',        r'сео\s+оптимизац.{0,20}недорого',    ]    for pattern in spam_patterns:        if re.search(pattern, text_lower):            return "SPAM"        # рассылки    bulk_headers = any(        k.lower() in ['list-unsubscribe', 'precedence', 'feedback-id', 'x-rpcampaign']        for k in headers    )    if bulk_headers and re.search(r'предлагаем|приглашаем|бесплатн.{0,20}консультац', text_lower):        return "SPAM"        # спам    if re.search(r'кето|метаболизм|инсулин|жиросжигани[ею]|жкт', text_lower) and \       re.search(r'ты\s+устал|энерги[яи]\s+на\s+нуле|мы\s+да[её]м\s+систему', text_lower):        return "SPAM"        # нужные письма    ham_patterns = [        r'код\s+(?:подтвержден|для\s+вход|доступа):\s*\d{4,6}',        r'ваш\s+заказ\s+№?\d+|order\s+#?\d+',        r'сброс\s+парол|password\s+reset',        r'платеж\s+прошел|payment\s+received',        r'встреч[аи]\s+(?:перенесен|назначен|состоится)',    ]    for pattern in ham_patterns:        if re.search(pattern, text_lower):            return "HAM"        return None

Правило про массовые рассылки оказалось очень полезным. Нейросеть часто пропускает такие письма, потому что они грамматически корректны и не содержат явного мусора. А связка заголовок List-Unsubscribe + слово приглашаем в тексте даёт 100% точности без ложных срабатываний на личные письма. Правило про медицинские термины — пример составной регулярки: по отдельности слова могут употребляться в переписке, но в сочетании с некоторыми это однозначный спам. Так файл pre_rules.py со временем обрастает специфичными правилами под ваш поток, который иногда можно пополнять.

Сокращенный Post-delivery скрипт:

#!/usr/bin/env python3import os, sys, socket, datetime, time, subprocessfrom email import policyfrom email.parser import BytesParserfrom html.parser import HTMLParserimport reSOCKET = "/tmp/spam_filter.sock"MAIL_BASE = "/home/admin/mail"         # путьFORWARD_TO = "your@gmail.com"          # куда присылатьWAIT_SECONDS = 5                       # ожиданиеLOCK_FILE = "/tmp/ai_spam_check.lock"def check_spam(email_bytes):    text, headers = extract_text_and_headers(email_bytes)        pre_verdict = check_pre_rules(text, headers)    if pre_verdict == "SPAM":        return True, text[:100], "pre_rule"    elif pre_verdict == "HAM":        return False, text[:100], "pre_rule"        text_bytes = text.encode('utf-8', errors='ignore')    try:        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)        s.settimeout(2)        s.connect(SOCKET)        s.settimeout(5)        s.sendall(text_bytes)        s.shutdown(socket.SHUT_WR)        r = s.recv(8).decode("ascii", errors="ignore").strip()        s.close()        return r == "1", text[:100], "ai"    except Exception as e:        log(f"DAEMON_ERROR: {e}")        return False, text[:100], "error"def forward_to_gmail(email_bytes, filename=""):    modified = b"X-Forwarded-By: AI-Filter\n" + email_bytes    p = subprocess.Popen(        ["/usr/sbin/sendmail", "-i", FORWARD_TO],        stdin=subprocess.PIPE,        stdout=subprocess.DEVNULL,        stderr=subprocess.DEVNULL    )    p.stdin.write(modified)    p.stdin.close()    p.wait()def process_new_dir(new_dir, spam_new_dir, cur_dir):    for filename in os.listdir(new_dir):        filepath = os.path.join(new_dir, filename)        if time.time() - os.path.getmtime(filepath) < WAIT_SECONDS:            continue                with open(filepath, "rb") as f:            email_bytes = f.read()                if b"X-Forwarded-By: AI-Filter" in email_bytes:            os.rename(filepath, os.path.join(cur_dir, filename + ":2,"))            continue                is_spam, preview, method = check_spam(email_bytes)        if is_spam:            os.rename(filepath, os.path.join(spam_new_dir, filename))            log(f"SPAM [{method}]: {filename} | {preview}")        else:            os.rename(filepath, os.path.join(cur_dir, filename + ":2,"))            forward_to_gmail(email_bytes, filename)            log(f"NOT_SPAM [{method}]: {filename} | {preview}")

Самое интересное — гонки с MTA. Если читать файл сразу после создания, можно получить недописанное письмо пока оно сохраняется на носитель. Отсюда WAIT_SECONDS: скрипт пропускает файлы моложе заданного возраста. Это плата за post-delivery.

Петли пересылки тоже пришлось учесть. Если случайно отправить самому себе или замкнуть цепочку, начнётся бесконечный цикл. Защита — заголовок X-Forwarded-By: скрипт видит его и пропускает такие письма, просто перемещая в cur/.

По RAM памяти: модель занимает около 550 МБ, скрипт проверки при запуске ещё около 50 МБ, Exim 25 МБ, Dovecot 10 МБ, SpamAssassin 80 МБ. Итого около 716 МБ на всю систему. У меня сервер на 2 гига, в итоге занято около одного. Всё стабильно, без свопа. Для более слабых машин подошёл бы DistilBERT, он вдвое легче, но у меня запаса хватает.

Если daemon упал, сокет недоступен или произошла любая ошибка — письмо считается нормальным и идёт дальше. Лучше получить пару спам-писем, чем пропустить важное.

Для мониторинга есть bash-скрипт SPAM, который показывает статус всех компонентов, статистику за день, состояние ящиков и trend за последний час прямо в терминале. Команда SPAM -w даёт live-обновление каждые 3 секунды, SPAM -t тестирует модель на 10 примерах, SPAM -r — pre-rules на 21 примере. Для админа почтового сервера удобный CLI-мониторинг оказывается полезнее любого веб-интерфейса. Логи пишу в /var/log/ai_spam.log с пометками [pre_rule] или [ai], чтобы было видно, какой уровень сработал.

Спам перестал доезжать до Gmail. Что раньше оседало в папке и требовало периодической чистки, теперь остаётся на сервере в .Spam/. На Gmail попадают только письма, прошедшие все три фильтра. Иногда модель ошибается — на редких темах или специфическом сленге. В таких случаях добавляю паттерн в pre_rules.py, и проблема больше не повторяется. Раз в месяц можно глянуть в .Spam/, просматриваю заголовки — не потерялось ли чего. Обычно нет. А если что и попадет по ошибке — пара строк в конфиге, и проблема закрыта.

Нагрузка на сервер минимальная, при сотне писем в день daemon большую часть времени спит, CPU почти не ест. Раз настроил и работает, иногда только добавляю новые регулярки, когда замечаю в логах повторяющийся мусор. Проблема со спамом закрыта!

А как вы боретесь со спамом на своих серверах? Если будут нужны исходники, то выложу на GitHub.

ссылка на оригинал статьи https://habr.com/ru/articles/1055518/