Всё об устройстве FT8/FT4 с примерами на Python

от автора

FT8 — цифровой радиолюбительский протокол, разработанный Джо Тейлором (K1JT) и Стивом Франке (K9AN) в 2017 году. В этой статье будут рассмотрены подробности работы протокола.

Статья может быть интересна радиолюбителям, как знакомым, так и не знакомым с протоколами FT8 и FT4, а также тем, кто хочет в подробностях понять устройство этих протоколов.

Введение

Протокол FT8 был разработан радиолюбителями Джо Тейлором (K1JT) и Стивом Франке (K9AN) в 2017 году, чьи инициалы были увековечены в названии протокола.

Разработчики протокола преследовали цель обеспечить связь при очень низком уровне сигнала, в том числе ниже порога слышимости человека; в плохих условиях прохождения радиосигнала; при использовании передатчиков низкой мощности; с использованием узких полос частот для приема и передачи сигнала.

Для обеспечения связи при вышеописанных условиях, формируемый сигнал сильно растянут во времени (порядка 15 секунд на передачу) и, как следствие, имеет низкий битрейт. Исходя из этого, протокол был спроектирован для работы в тайм-слотах (временные интервалы), поэтому работа протокола должна быть синхронизирована с точностью до секунд с мировым временем.

FT4 был разработан в 2019 году и является более быстрой версией протокола FT8. Основные цели разработки протокола: увеличения количества QSO за счёт уменьшения времени передачи одного сообщения; проведения радиосвязей в условиях соревнований.

Из-за повышенной скорости протокол имеет более широкую полосу частот.

Протоколы используют GFSK (Gaussian Frequency Shift Keying) — частотную манипуляцию с использованием фильтра Гаусса. Цифры в названии протокола определяют количество тонов, используемых при FSK.

Общие технические характеристики

FT8

  • Цикл передачи 15 секунд;

  • Скорость передачи 6,09 бит/с;

  • Ширина полосы 50 Гц;

  • Размер сообщения 77 бит + 12-бит CRC-14;

  • Механизм коррекции ошибок LDPC (174,87);

  • Модуляция 8-GFSK с шагом 6.25Гц;

  • Порог декодирования сигналов до -24…-26 дБ относительно шума.

FT4

  • Цикл передачи 7.5 секунд;

  • Ширина полосы 90 Гц;

  • Размер сообщения 77 бит + 12-бит CRC-14;

  • Механизм коррекции ошибок LDPC (174,87);

  • Модуляция 4-GFSK с шагом 23.4Гц;

  • Порог декодирования сигналов до -24…-26 дБ относительно шума.

Структура протокола

Как и любой протокол передачи данных, FT8 и FT4 представляю собой «матрешку», в которой данные одного уровня преобразуют данные для передачи уровнем ниже.

Общая схема протокола приведена на рисунке 1. Разделение на уровни модели OSI условное.

Рисунок 1: Общая схема протокола (абстракция по уровням OSI условная).

На рисунке 1.А приведена схема взаимодействия алгоритмов кодирования при формировании исходящего сигнала; на рисунке 1.B — схема обработки и декодирования принятого сигнала.

Протокол прикладного уровня

При проведении связей в FT8/FT4 участники разделяются на принимающих и передающих.

Так как протокол привязан к временным интервалам, прием и передача осуществляются в четные и нечетные четверти (⅛ в случае FT4) минут, передающие корреспонденты излучают сигнал, в то время как в этот временной промежуток осуществляется прием и накопление сигнала на стороне слушающих; в следующем временном промежутке происходит смена ролей, принимавшие сигнал передают ответ (если посчитают нужным), а передавшие свой сигнал становятся слушателями и т.д. цикл повторяется. Модульная привязка к секундам необходима для избежания ситуации, когда корреспонденты начинают передавать в произвольном порядке и расшифровка сигнала становится невозможной.

Обмен данными сильно похож на обычный любительский радиообмен, когда часть корреспондентов дает общий вызов, а остальные слушают и принимают решение о попытке проведения связи с этим участником.

Сценарий обычного сеанса связи:

Временной отрезок

Данные

Комментарий

00:00-00:15

CQ R3AAA KO85

Корреспондент из Москвы (локатор KO85), с позывным R3AAA приглашает для установления связи

00:15-00:30

R3AAA R9FXX LO87

Отвечает корреспондент из Перми (локатор LO87) с позывным R9FXX

00:30-00:45

R9FXX R3AAA -1

R3AAA дает рапорт для R9FXX в -1 dB по SNR

00:45-01:00

R3AAA R9FXX R-4

R9FXX дает рапорт для R3AAA в -4 dB по SNR

01:00-01:15

R9FXX R3AAA RR73

R3AAA подтверждает прием метаданных и прощается

01:15-01:30

R3AAA R9FXX 73

R9FXX взаимно отправляет 73

Общее время проведения связи наняло 1 минуту 30 секунд.

Сценарий короткого сеанса связи:

Временной отрезок

Данные

Комментарий

00:00-00:15

CQ R3AAA KO85

Корреспондент из Москвы (локатор KO85), с позывным R3AAA приглашает для установления связи

00:15-00:30

R3AAA R9FXX R-4

R9FXX дает рапорт для R3AAA в -4 dB по SNR

00:30-00:45

R9FXX R3AAA R-1

R3AAA дает рапорт для R9FXX в -1 dB по SNR

00:45-01:00

R3AAA R9FXX RR73

R9FXX подтверждает прием метаданных и прощается

01:00-01:15

R9FXX R3AAA 73

R3AAA взаимно отправляет 73

Сокращенные сеансы применяются в случаях когда у дальнего корреспондента большая очередь желающих провести связь.

В случаях когда кто-то из участников не может с первого раза получить ответ, сообщение может быть отослано повторно и сеанс связи потребует большего времени.

Пример сеанса радиосвязи R9FEU с R1CDY:

Время UTC

SNR

Δt

Частота

Данные

141945

Tx

1000

CQ R9FEU LO87

142000

-3

0.1

1001

R9FEU R1CDY KP40

142015

Tx

1000

R1CDY R9FEU -03

142030

1

0.1

1001

R9FEU R1CDY R-17

142045

Tx

1000

R1CDY R9FEU RR73

142100

-9

0.1

1001

R9FEU R1CDY 73

Пример сокращенного сеанса радиосвязи R9FEU c UA3DOI:

Время UTC

SNR

Δt

Частота

Данные

141745

Tx

1000

CQ R9FEU LO87

141800

8

0.1

1001

R9FEU UA3DOI -01

141815

Tx

1000

UA3DOI R9FEU R+08

141830

-5

0.1

1001

R9FEU UA3DOI RR73

141845

Tx

1000

UA3DOI R9FEU 73

Уровень представления

На уровне представления происходит подготовка данных, в частности перекодирование информации и расчет контрольной суммы сообщения CRC-14.

Формат сообщений

В основном, сообщения, передаваемые протоколами семейства FTX делятся на:

  • радиолюбительский обмен (QSO);

  • телеметрия;

  • произвольный текст.

Все радиолюбительские сообщения состоят из групп: вызываемый (call_to), вызывающий (call_de), данные (extra). В качестве данных может передаваться SNR-рапорт, четырех-символьный квадрат локации и сигналы roger/73.

Кодирование сообщения

Большинство сообщений в протоколе состоят из трех полей: вызываемый, вызывающий и доп. данные (например рапорт или гео данные).

Кодирование заключается в том, чтобы максимально плотно уместить данные исходного сообщения в 77 бит.

Определение констант и утилит, необходимых для кодирования:

import string   FTX_CHAR_TABLE_NUMERIC = string.digits FTX_CHAR_TABLE_LETTERS = string.ascii_uppercase FTX_CHAR_TABLE_ALPHANUM = f"{FTX_CHAR_TABLE_NUMERIC}{FTX_CHAR_TABLE_LETTERS}" FTX_CHAR_TABLE_LETTERS_SPACE = f" {FTX_CHAR_TABLE_LETTERS}" FTX_CHAR_TABLE_ALPHANUM_SPACE = f" {FTX_CHAR_TABLE_ALPHANUM}" FTX_CHAR_TABLE_ALPHANUM_SPACE_SLASH = f"{FTX_CHAR_TABLE_ALPHANUM_SPACE}/" FTX_CHAR_TABLE_FULL = f"{FTX_CHAR_TABLE_ALPHANUM_SPACE}+-./?"   def charn(c: int, table: str) -> str:    return table[c]   def nchar(c: str, table: str) -> int:    return table.find(c) 

В протоколах семейства FTX алфавит состоит из ASCII символов «ABCDEFGHIJKLMNOPQRSTUVWXYZ», цифр «0123456789» и символов «+-./?», включая знак пробела.

Полный алфавит содержится в FTX_CHAR_TABLE_FULL.

В алфавите FTX символу A соответствует значение в 1, B — 2 и т.д.. Для перевода из ANSI в алфавит FTX используется функция nchar, которая по своей сути сопоставляет индекс кодируемого символа c в таблице table, которой является одно из представлений значений FTX_CHAR_TABLE_*. В случае обработки неподдерживаемого символа, функция возвращает значение -1.

Для декодирования из формата FTX используется функция charn, которая решает обратную задачу, возвращает ANSI символ, соответствующей позиции c в table.

Кодирование позывного сигнала

Для кодирования позывного сигнала его необходимо нормализовать до 6-и символьного. Короткие позывные дополняются пробелами в начале строки, а длинные приводятся к другому виду. Швейцарские позывные из формата 3DA0XYZ нормализуются в 3D0XYZ, африканские 3XA0XYZ в QA0XYZ.

Функция кодирования позывного сигнала:

FTX_BASECALL_CHAR_MAP = [    FTX_CHAR_TABLE_ALPHANUM_SPACE,    FTX_CHAR_TABLE_ALPHANUM,    FTX_CHAR_TABLE_NUMERIC,    FTX_CHAR_TABLE_LETTERS_SPACE,    FTX_CHAR_TABLE_LETTERS_SPACE,    FTX_CHAR_TABLE_LETTERS_SPACE ]   def pack_basecall(callsign: str) -> int:    if (length := len(callsign)) > 2:        if callsign.startswith("3DA0") and 4 < length <= 7:            cs_6 = f"3D0{callsign[4:]}"        elif callsign.startswith("3X") and callsign[2].isalpha() and length <= 7:            cs_6 = f"Q{callsign[2:]}"        elif callsign[2].isdigit() and length <= 6:            cs_6 = callsign        elif callsign[1].isdigit() and length <= 5:            cs_6 = f" {callsign}"        else:            cs_6 = " " * 6         cs_6 = cs_6 + " " * (6 - len(cs_6))         n_chars = list(map(nchar, cs_6, FTX_BASECALL_CHAR_MAP))         if all(nc >= 0 for nc in n_chars):            n = reduce(lambda a, it: a * len(it[0]) + it[1], zip(FTX_BASECALL_CHAR_MAP, n_chars), 0)            return n    return -1 

В FTX_BASECALL_CHAR_MAP определяется последовательность алфавитов, на основе которых будет производиться кодирование позывного сигнала, таким образом, первая цифро-буквенная литера является опциональной; третьим символом обязательно должно быть цифровое значение, далее следуют буквенные символы, включая пробел.

Функция pack_basecall на вход принимает строку с позывным сигналом, длина которого должна быть минимум 3 знака. Далее позывной нормализуется до 6-и символьного. В n_chars содержатся закодированные по схеме FTX_BASECALL_CHAR_MAP символы; затем, если перекодирование всех символов прошло успешно, выполняется битовая упаковка данных, путем умножения результирующего значения на мощности используемых алфавитов.

При кодировании позывного R1ABC происходит нормализация к виду ‘ R1ABC‘, Индексы символов: [0, 27, 1, 1, 2, 3]; результат: 5334879 (0b10100010110011101011111).

Декодирование позывного сигнала

Операция декодирования позывного сигнала выполняет в обратном порядке все те же самые операции, что и операция кодирования.

NTOKENS = 2063592 MAX22 = 4194304   FTX_BASECALL_SUFFIX_FMT = {    1: "{cs}/R",    2: "{cs}/P", } FTX_TOKEN_STR = {    v: k for k, v in FTX_TOKEN_CODE.items() }   def unpack_callsign(cs_28: int, flags: bool, suffix: int) -> typing.Optional[str]:    if cs_28 < NTOKENS:        if cs_28 <= 2:            return FTX_TOKEN_STR.get(cs_28)        if cs_28 <= 1002:            return f"CQ_{cs_28 - 3:03}"        if cs_28 <= 532443:            n = cs_28 - 1003            aaaa = ""            for i in range(4):                ct = FTX_CHAR_TABLE_LETTERS_SPACE                ct_l = len(ct)                aaaa = charn(n % ct_l, ct) + aaaa                n //= ct_l             return f"CQ_{aaaa.strip()}"        return None     cs_28 -= NTOKENS    n = cs_28 - MAX22     callsign = ""    for ct in reversed(FTX_BASECALL_CHAR_MAP):        ct_l = len(ct)        callsign = charn(n % ct_l, ct) + callsign        n //= ct_l     callsign = callsign.strip()     if callsign.startswith("3D0") and callsign[3] != " ":        result = f"3DA0{callsign[3:]}"    elif callsign[0] == "Q" and callsign[1].isalpha():        result = f"3X{callsign[1:]}"    else:        result = callsign     if len(result) < 3:        return None     if flags:        if fmt := FTX_BASECALL_SUFFIX_FMT.get(suffix):            return fmt.format(cs=result)        raise ValueError     return result 

Функция unpack_callsign выполняет распаковку бит в строку, в соответствии с таблицами символов. Декодирование символов происходит в обратном порядке, reversed(FTX_BASECALL_CHAR_MAP). Функция также проверяет, являются ли переданные данные шаблонными значениями, такими как CQ/DE/QRZ, перечисленными в FTX_TOKEN_STR.

Также дополнительно происходит добавление суффиксов, если указан параметр flags (суффиксы /R, /P из FTX_CALLSIGN_SUFFIX_FMT). Длинные позывные, подвергшиеся сокращению на этапе кодирования, также восстанавливаются до первоначального вида.

Результатом работы функции является строка, содержащая позывной сигнал в текстовом виде.

Кодирование локации

При установлении связи, корреспонденты обмениваются геолокационными квадратами (QTH Loc), представляющие собой строку из чередующихся пар букв и цифр, каждая пара уточняет границы расположения географической точки.

Расчет значений:

  1. пара букв:

    • долгота делится на 20, целая часть от 0 до 17 кодируется буквами от A до R;

    • широта делится на 10, целая часть от 0 до 17 кодируется буквами от A до R;

  2. пара цифр:

    • остаток долготы от деления на 20 делится на 2;

    • остаток широты от деления на 10 делится на 1.

FTX_CHAR_TABLE_GRID_LETTERS = FTX_CHAR_TABLE_LETTERS[:18] FTX_GRID_CHAR_MAP = [    FTX_CHAR_TABLE_GRID_LETTERS,    FTX_CHAR_TABLE_GRID_LETTERS,    FTX_CHAR_TABLE_NUMERIC,    FTX_CHAR_TABLE_NUMERIC ]   def pack_grid(grid4: str) -> int:    n_chars = list(map(nchar, grid4, FTX_GRID_CHAR_MAP))    n = reduce(lambda a, it: a * len(it[0]) + it[1], zip(FTX_GRID_CHAR_MAP, n_chars), 0)    return n 

Для кодирования квадрата используется усеченный алфавит FTX_CHAR_TABLE_GRID_LETTERS и карта соответствия FTX_GRID_CHAR_MAP.

Аналогично функции pack_basecall, функция pack_grid переводит четырехсимвольный квадрат в число; например квадрат KO85 (Москва) будет закодирован в 19485 (0b100110000011101).

Кодирование рапорта

Так как третьей частью сообщения может быть как геолокационный квадрат или рапорт, так же может быть и roger-ответ; при кодировании необходимо отличать тип передаваемых данных.

FTX_MAX_GRID_4 = 32400 FTX_RESPONSE_EXTRAS_CODE = {    "": FTX_MAX_GRID_4 + 1,    "RRR": FTX_MAX_GRID_4 + 2,    "RR73": FTX_MAX_GRID_4 + 3,    "73": FTX_MAX_GRID_4 + 4, }   def pack_extra(extra: str) -> int:    if id_resp := FTX_RESPONSE_EXTRAS_CODE.get(extra):        return id_resp    if re.match(r"^(([A-R]{2})([0-9]{2}))$", extra):        return pack_grid(extra)    if not (report := re.match(r"^(R){0,1}([\+\-]{0,1}[0-9]+)$", extra)):        raise FTXInvalidRST     r_sign, r_val = report.groups()    i_report = int(r_val) + 35    return (FTX_MAX_GRID_4 + i_report) | (0x8000 if r_sign is not None else 0) 

Функция pack_extra сначала проверяет признак, не являются ли переданные данные roger-ответом, далее, если данные соответствуют регулярному выражению с геолокационным квадратом, осуществляется вызов ранее рассмотренной функции pack_grid; в противном случае переданные данные интерпретируются как SNR-рапорт.

Декодирование рапорта и локации

Операция декодирования рапорта/локации, как и вышеописанная операции декодирования позывного сигнала, также в обратном порядке выполняет действия, осуществляемые при кодировании.

FTX_RESPONSE_EXTRAS_STR = {    v: k for k, v in FTX_RESPONSE_EXTRAS_CODE.items() }   def unpack_extra(ex_16: int, is_report: bool) -> typing.Optional[str]:    if ex_16 <= FTX_MAX_GRID_4:        n = ex_16        dst = ""        for ct in reversed(FTX_GRID_CHAR_MAP):            ct_l = len(ct)            dst = charn(n % ct_l, ct) + dst            n //= ct_l         return f"{'R ' if is_report else ''}{dst}"    else:        if irpt := FTX_RESPONSE_EXTRAS_STR.get(ex_16):            return irpt         return f"{'R' if is_report else ''}{int(ex_16 - FTX_MAX_GRID_4 - 35):+03}" 

Функция unpack_extra проверяет, являются ли переданные данные локатором или SNR-рапортом. Декодирование локации осуществляется с конца к началу.

При передаче рапорта, дополнительно, осуществляется проверка на шаблонный ответ из FTX_RESPONSE_EXTRAS_STR.

Результатом работы функции является строка, содержащая либо идентификатор локации, либо рапорт (или шаблонный ответ, например RR73).

Кодирование стандартного сообщения

Стандартное сообщение состоит из трех частей:

  • общий вызов или позывной вызываемого;

  • позывной вызываемого;

  • квадрат локации или рапорт/roger-сигнал.

NTOKENS = 2063592 MAX22 = 4194304 FTX_TOKEN_CODE = {    "DE": 0,    "QRZ": 1,    "CQ": 2 }   def byte(i: int) -> int:    return i & 0xff   def endswith_any(s: str, *tails: str) -> bool:     return any(s.endswith(tail) for tail in tails)   def pack_callsign(callsign: str) -> typing.Tuple[int, int]:    shift = 0    if token := FTX_TOKEN_CODE.get(callsign):        return token, shift     length = len(callsign)    if callsign.startswith("CQ_") and length < 8:        rest = callsign[3:]        rest_len = len(rest)         if rest_len == 3 and rest.isdigit():            return int(rest) + 3, shift         if 1 <= rest_len <= 4:            nlet = 0            correct = True            for c in rest:                if (n := nchar(c, FTX_CHAR_TABLE_LETTERS_SPACE)) == -1:                    correct = False                    break                nlet = nlet * 27 + n             if correct:                return nlet + 1003, shift     length_base = length    if endswith_any(callsign, "/P", "/R"):        shift = 1        length_base = length - 2     if (n28 := pack_basecall(callsign[:length_base])) >= 0:        return dword(NTOKENS + MAX22 + n28), shift     raise FTXPackCallsignError   def ftx_message_encode_std(call_to: str, call_de: str, extra: str) -> typing.ByteString:    b28_to, sh_to = pack_callsign(call_to)    if b28_to < 0:        raise FTXErrorCallSignTo     b28_de, sh_de = pack_callsign(call_de)    if b28_de < 0:        raise FTXErrorCallSignDe     suffix = 1    if any(cs.endswith("/P") for cs in (call_to, call_de)):        suffix = 2        if any(cs.endswith("/R") for cs in (call_to, call_de)):            raise FTXErrorSuffix     if call_to == "CQ" and "/" in call_de and not endswith_any(call_de, "/P", "/R"):        raise FTXErrorCallSignDe     b16_extra = pack_extra(extra)     b29_to = dword(b28_to << 1 | sh_to)    b29_de = dword(b28_de << 1 | sh_de)     if endswith_any(call_to, "/P", "/R"):        b29_to |= 1        if call_to.endswith("/P"):            suffix = 2     bytes = [        byte(b29_to >> 21),        byte(b29_to >> 13),        byte(b29_to >> 5),        byte(b29_to << 3) | byte(b29_de >> 26),        byte(b29_de >> 18),        byte(b29_de >> 10),        byte(b29_de >> 2),        byte(b29_de << 6) | byte(b16_extra >> 10),        byte(b16_extra >> 2),        byte(b16_extra << 6) | byte(suffix << 3)    ]    return bytearray(b for b in bytes) 

Функция ftx_message_encode_std принимает на вход три аргумента с вышеописанными данными. При кодировании позывных сигналов, функция pack_callsign проверяет, является ли позывной специальным токеном (например CQ при общем вызове), для которых, для повышения плотности, присвоены специальные коды в FTX_TOKEN_CODE; далее, используя вышеописанные функции битового кодирования, позывной кодируется в набор бит в виде целого числа, после чего данные конкатенируются в массив байт.

Суффиксы позывных сигналов отмечаются в третьем блоке данных сообщения. Код byte(suffix << 3) записывает в сообщение признак того, что это стандартное сообщение. Значение «1» суффикса соответствует обычному сообщению; значение «2» — проведение связей в формате DX-педиции.

Перечень типов сообщений:

  • FTX_MESSAGE_TYPE_FREE_TEXT — 1;

  • FTX_MESSAGE_TYPE_DXPEDITION — 2;

  • FTX_MESSAGE_TYPE_EU_VHF — 3;

  • FTX_MESSAGE_TYPE_ARRL_FD — 4;

  • FTX_MESSAGE_TYPE_TELEMETRY — 6;

  • FTX_MESSAGE_TYPE_CONTESTING — 7;

  • FTX_MESSAGE_TYPE_STANDARD — 8;

  • FTX_MESSAGE_TYPE_ARRL_RTTY — 10;

  • FTX_MESSAGE_TYPE_NONSTD_CALL — 11;

  • FTX_MESSAGE_TYPE_WWROF — 12.

Примеры кодирования сообщений:

Сообщение

Закодированные данные (hex)

CQ R1ABC KO85

00000020587223930748

R2CBA R1ABC R+01

0b136da0587223bfad08

R1ABC R2CBA -20

0b0e4470589b6d1fa7c8

R2CBA R1ABC RR73

0b136da05872239fa4c8

Декодирование стандартного сообщения

Декодирование сообщения осуществляется выполнением в обратной последовательности тех же действий, что и при кодировании.

def ftx_message_decode_std(payload: typing.ByteString) -> typing.Tuple[str, str, str]:    b29_to = payload[0] << 21    b29_to |= payload[1] << 13    b29_to |= payload[2] << 5    b29_to |= payload[3] >> 3     b29_de = (payload[3] & 0x07) << 26    b29_de |= payload[4] << 18    b29_de |= payload[5] << 10    b29_de |= payload[6] << 2    b29_de |= payload[7] >> 6     r_flag = (payload[7] & 0x20) >> 5     b16_extra = (payload[7] & 0x1F) << 10    b16_extra |= payload[8] << 2    b16_extra |= payload[9] >> 6     cs_flags = (payload[9] >> 3) & 0x07     if (call_to := unpack_callsign(b29_to >> 1, bool(b29_to & 1), cs_flags)) is None:        raise FTXErrorCallSignTo    if (call_de := unpack_callsign(b29_de >> 1, bool(b29_de & 1), cs_flags)) is None:        raise FTXErrorCallSignDe    if (extra := unpack_extra(b16_extra, bool(r_flag & 1))) is None:        raise FTXErrorGrid    return call_to, call_de, extra 

Функция ftx_message_decode_std принимает на вход строку байт, битовыми операциями извлекает участки данных с закодированными позывными и рапортом/локацией. Далее функция передает извлеченные данные в функцию unpack_callsign и unpack_extra, которые декодируют значения и возвращают данные в строковом виде.

Результатом работы функции является тройка строковых данных, состоящих из вызываемого и вызывающего позывных, а также доп данных в виде локации/лапорта.

Телеметрия и произвольный текст

Протоколы семейства FTX предусматривают возможность отправки данных, не относящихся к стандартному радиолюбительскому обмену, например передавать данные телеметрии. Так как телеметрия представляет собой последовательность байт и максимальный размер передаваемых данных составляет 77 бит, то используя механизм упаковки бит, можно закодировать текстовое сообщение длиной до 12 символов.

FTX_MESSAGE_FREE_TEXT_LEN = 12 FTX_MESSAGE_TELEMETRY_LEN = 9   def ftx_message_encode_free(text: str) -> typing.ByteString:    if len(text) > FTX_MESSAGE_FREE_TEXT_LEN:        raise FTXErrorTooLong     payload = bytearray(b"\x00" * FTX_MESSAGE_TELEMETRY_LEN)    text = (" " * (FTX_MESSAGE_FREE_TEXT_LEN - len(text))) + text    for c in text:        if (cid := nchar(c, FTX_CHAR_TABLE_FULL)) == -1:            raise FTXErrorInvalidChar         rem = cid        for i in reversed(range(FTX_MESSAGE_TELEMETRY_LEN)):            rem += payload[i] * len(FTX_CHAR_TABLE_FULL)            payload[i] = byte(rem)            rem >>= 8     return ftx_message_encode_telemetry(payload)   def ftx_message_encode_telemetry(payload: typing.ByteString) -> typing.ByteString:    if len(payload) > FTX_MESSAGE_TELEMETRY_LEN:        raise FTXErrorTooLong     carry = 0    data = bytearray(b"\x00" * len(payload))    for i, t_byte in enumerate(reversed(payload)):        data[-i - 1] = byte((carry >> 7) | (t_byte << 1))        carry = byte(t_byte & 0x80)     return data 

Функция ftx_message_encode_free осуществляет упаковку бит символов из text, используя таблицу FTX_CHAR_TABLE_FULL, в 9-и битный массив байт и передает его для дальнейшего кодирования в ftx_message_encode_telemetry. Перед кодированием исходный текст нормализуется до 12-и символьной строки, путем добавления необходимого количества пробелов в начало текста.

Таким образом, строка «0123456789AB» будет преобразована в массив байт (payload) «000a7271499bcb384a».

Функция ftx_message_encode_telemetry выполняет битовый сдвиг данных для выравнивания по правому краю.

Результатом является массив байт, в каждой ячейке которого значения сдвинуты влево на 1 бит, относительно исходных данных.

Декодирования телеметрии и произвольного текста осуществляется в обратном порядке: принятые данные телеметрии сдвигаются на 1 бит вправо, а текст сообщения раскодируется по таблице FTX_CHAR_TABLE_FULL обратно в строку.

def ftx_message_decode_telemetry(data: typing.ByteString) -> typing.Generator[int, None, None]:    carry = 0    for p_byte in data:        yield byte((carry << 7) | (p_byte >> 1))        carry = byte(p_byte & 0x01)   def ftx_message_decode_free(data: typing.ByteString) -> str:    payload = bytearray(ftx_message_decode_telemetry(data))    text = " "    for _ in range(FTX_MESSAGE_FREE_TEXT_LEN):        rem = 0        for i in range(FTX_MESSAGE_TELEMETRY_LEN):            rem = (rem << 8) | payload[i]            payload[i] = byte(rem // 42)            rem = rem % 42         text = charn(rem, FTX_CHAR_TABLE_FULL) + text     return text.strip() 

CRC-14

Расчет CRC-14

В качестве проверки целостности сообщения в протоколах семейства FTX предусмотрен расчет 14-и битного циклического избыточного кода (контрольной суммы) — CRC-14. Данный код подтверждает факт целостности данных в принятом сигнале; сигналы, в которых расчетная контрольная сумма сообщения не совпадает с фактической, игнорируются на принимающей стороне.

При 14-и битной контрольной сумме, алгоритм CRC-14 обеспечивает высокую вероятность обнаружения ошибок на коротких сообщениях, и минимально расходует объем передаваемых данных. По сравнению. например, с CRC-8, рассматриваемый алгоритм позволяет находить до 4 бит ошибок, а также имеет более низкую вероятность коллизии (1/16384 против 1/256 для CRC-8). Таким образом, алгоритм CRC-14 используется как компромисс между компактностью и надежностью; алгоритм CRC-8 и близкие к нему, более применимы к совсем коротким сообщениям.

FTX_LDPC_K = 91 FTX_LDPC_M = 83 FTX_LDPC_N = FTX_LDPC_K + FTX_LDPC_M FTX_LDPC_N_BYTES = ((FTX_LDPC_N + 7) // 8) FTX_LDPC_K_BYTES = ((FTX_LDPC_K + 7) // 8)   FTX_CRC_POLYNOMIAL = 0x2757 FTX_CRC_WIDTH = 14 FTX_PAYLOAD_BITS = 96 FTX_MESSAGE_BITS = FTX_PAYLOAD_BITS - FTX_CRC_WIDTH TOPBIT = 1 << (FTX_CRC_WIDTH - 1)   def ftx_compute_crc(message: typing.ByteString, num_bits: int) -> int:    remainder = 0    idx_byte = 0     for idx_bit in range(num_bits):        if idx_bit % 8 == 0:            remainder ^= message[idx_byte] << (FTX_CRC_WIDTH - 8)            idx_byte += 1         if remainder & TOPBIT != 0:            remainder = (remainder << 1) ^ FTX_CRC_POLYNOMIAL        else:            remainder = remainder << 1     return remainder & ((TOPBIT << 1) - 1)   def ftx_add_crc(payload: typing.ByteString) -> typing.ByteString:    message = payload + (b"\x00" * (FTX_LDPC_K_BYTES - len(payload)))    message[-3] &= 0xf8    message[-2] = 0     checksum = ftx_compute_crc(message, FTX_MESSAGE_BITS)     message[-3] |= byte(checksum >> 11)    message[-2] = byte(checksum >> 3)    message[-1] = byte(checksum << 5)     return message 

Функция ftx_compute_crc реализует алгоритм вычисления CRC-14. Полином, для вычисления остатка от деления данных, определен в FTX_CRC_POLYNOMIAL со значением 0x2757 (0b10011101010111).

Задача функции ftx_add_crc рассчитать и добавить к исходным данных значение функции ftx_compute_crc; предварительно данные подгоняются под длину сообщения в 12 байт (98 бит), контрольная сумма записывается в три последних байта сообщения.

Пример контрольной суммы для данных 00000020587223930748 (закодированное сообщение CQ R1ABC KO85): 11173 (0x2ba5); результат добавления контрольной суммы к данным: 0000002058722393074d74a0.

Проверка CRC-14

Так как принимаемое сообщение представляет собой массив байт, в котором содержатся сами данные сообщения и контрольный проверочный код CRC-14, при проверке достаточно отделить контрольную сумму от данных, выполнить пересчет CRC-14 и сравнить полученный результат с принятой контрольной суммой.

def ftx_crc(msg1: typing.ByteString, msglen: int) -> typing.ByteString:    div = [1, 1, 0, 0, 1, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1]     msg = bytearray(b"\x00" * (FTX_LDPC_M + FTX_CRC_WIDTH))    for i in range(msglen + FTX_CRC_WIDTH):        if i < 77:            msg[i] = msg1[i]     for i in range(msglen):        if msg[i] != 0:            for j, d in enumerate(div):                msg[i + j] = msg[i + j] ^ d     return msg[msglen:msglen + FTX_CRC_WIDTH]   def ftx_check_crc(a91: typing.ByteString) -> bool:    out1 = ftx_crc(a91, 82)    for i, b in enumerate(out1):        if b != a91[FTX_LDPC_K - FTX_CRC_WIDTH + i]:            return False    return True 

Функция ftx_check_crc осуществляет проверку CRC-14 в блоке данных a91, извлекая 82 бита данных из проверяемого 91 битного блока данных и сверяет совпадение CRC-14 данных с контрольным кодом, находящимся в хвосте a91.

Результатом функции является булево значение.

LDPC (низкоплотностный код)

LDPC (Low-Density Parity-Check code) — код с малой плотностью проверок на чётность. Алгоритм используется для исправления ошибок, возникающих при передаче данных, повышая общую помехоустойчивость.

Более подробное описание теории и принципов работы LDPC достаточно объемное и выходит за рамки статьи.

С подробностями устройства и работы LDPC можно ознакомиться здесь:

  • https://en.wikipedia.org/wiki/Low-density_parity-check_code

  • https://habr.com/ru/articles/453086/

  • https://habr.com/ru/articles/830150/

Кодирование LDPC

LDPC-код относится к категории линейных блочных кодов, определяемых матрицей проверки на четность с небольшим количеством единиц; таким образом, в матрице, используемой при кодировании, количество единиц относительно невелико по сравнению с общим количеством элементов в матрице.

Рисунок 2: Граф Таннера и матрица проверок LDPC.

На рисунке 2 приведена схема представления работы LDPC-кодов, в частности LDPC-код (12, 8); в кругах, пронумерованных от 1 до 8 обозначены данные исходного сообщения, в квадратах с номерами от 1 до 4 — биты четности с такими значениями, чтобы результат сложения бит данных с битом четности, по модулю 2, был равен нулю.

Проверочная матрица H в столбцах определяет то, с какими битами исходного сообщения связан бит четности (так, на рисунке линиями первый бит четности соединен с 1, 4,5, 7 и 8 битами сообщения, что соответствует первой строке матрице в виде записи [1 0 0 1 1 0 1 1]); количество строк матрицы определяет то, сколько битов четности будет рассчитано. Также проверочная матрица H должна обладать свойствами разреженности и нерегулярности.

Таким образом, зная дополнительные биты четности и структуру проверочной матрицы, повторно выполнив суммирование бит по модулю 2, можно проверить корректность данных и, в случае искажения данных, локализовать и устранить ошибку, так как на один бит данных может ссылаться несколько битов проверки четности. Таким же образом осуществляется определение синдрома ошибки в самих проверочных битах.

Протоколы семейства FTX используют LDPC(174, 87), то есть к исходным данным добавляется 87 бит с проверкой на четность.

Таким образом, на принимающей стороне, при потере или искажении части сообщения, сигнал можно будет восстановить используя избыточные биты рассчитанные при LDPC-кодировании.

FTX_LDPC_GENERATOR = [    [0x83, 0x29, 0xce, 0x11, 0xbf, 0x31, 0xea, 0xf5, 0x09, 0xf2, 0x7f, 0xc0],    [0x76, 0x1c, 0x26, 0x4e, 0x25, 0xc2, 0x59, 0x33, 0x54, 0x93, 0x13, 0x20],    [0xdc, 0x26, 0x59, 0x02, 0xfb, 0x27, 0x7c, 0x64, 0x10, 0xa1, 0xbd, 0xc0],    ...    [0x60, 0x8c, 0xc8, 0x57, 0x59, 0x4b, 0xfb, 0xb5, 0x5d, 0x69, 0x60, 0x00] ]   def parity8(x: int) -> int:    for i in [4, 2, 1]:        x ^= x >> i    return byte(x % 2)   def ftx_encode(message: typing.ByteString) -> typing.ByteString:    codeword = bytearray(message[i] if i < FTX_LDPC_K_BYTES else 0 for i in range(FTX_LDPC_N_BYTES))     col_mask = 0x80 >> (FTX_LDPC_K % 8)    col_idx = FTX_LDPC_K_BYTES - 1     for i in range(FTX_LDPC_M):        nsum = 0        for j in range(FTX_LDPC_K_BYTES):            bits = message[j] & FTX_LDPC_GENERATOR[i][j]            nsum ^= parity8(bits)         if nsum % 2:            codeword[col_idx] |= col_mask         col_mask >>= 1        if col_mask == 0:            col_mask = 0x80            col_idx += 1     return codeword 

Проверочная матрица определена в FTX_LDPC_GENERATOR, так как она достаточно большая, чтобы целиком приводить ее в тексте, приведены только начало и конец матрицы. Саму матрицу целиком можно посмотреть здесь.

Функция ftx_encode осуществляет LDPC кодирование данных, переданных в параметре message; результатом функции является 174 битный код, содержащий в себе исходные данные из message, включая 87 LDPC-бит.

Функция parity8 осуществляет побитовую операцию XOR для расчета бита четности.

Пример:

message: 0000002058722393074d74a0

ftx_encode: 0000002058722393074d74a67d749e15d81ecea9e3a0

Декодирование LDPC

Один из популярных алгоритмов декодирования LDPC-кодов — алгоритм Belief Propagation (алгоритм распространения доверия).

Каждая итерация алгоритма состоит из двух шагов:

  1. узлы данных передают сообщения узлам проверок;

  2. проверочные узлы передают сообщения узлам данных.

Итерации повторяются до достижения приемлемого количества ошибок, либо до достижения максимального количества итераций.

FTX_LDPC_NM = [    [4, 31, 59, 91, 92, 96, 153],    ...    [17, 42, 75, 129, 170, 172, 0], ]   FTX_LDPC_MN = [    [16, 45, 73],    ...    [42, 49, 57], ]   FTX_LDPC_NUM_ROWS = [    7, 6, 6, 6, 7, 6, 7, 6, 6, 7, 6, 6, 7, 7, 6, 6,    ...    6, 6, 6 ]   def fast_tanh(x: float) -> float:    if x < -4.97:        return -1.0    if x > 4.97:        return 1.0    x2 = x ** 2    a = x * (945.0 + x2 * (105.0 + x2))    b = 945.0 + x2 * (420.0 + x2 * 15.0)    return a / b   def fast_atanh(x: float) -> float:    x2 = x ** 2    a = x * (945.0 + x2 * (-735.0 + x2 * 64.0))    b = (945.0 + x2 * (-1050.0 + x2 * 225.0))    return a / b   def ldpc_check(codeword: bytes) -> int:    errors = 0    for m in range(FTX_LDPC_M):        x = 0        for i in range(FTX_LDPC_NUM_ROWS[m]):            x ^= codeword[FTX_LDPC_NM[m][i] - 1]        if x:            errors += 1    return errors   def bp_decode(codeword: typing.List[float], max_iters: int) -> typing.Tuple[int, typing.ByteString]:    min_errors = FTX_LDPC_M     tov = [[0.0] * 3 for _ in range(FTX_LDPC_N)]    toc = [[0.0] * 7 for _ in range(FTX_LDPC_M)]     plain = bytearray(b"\x00" * FTX_LDPC_N)     for _ in range(max_iters):        plain_sum = 0        for n in range(FTX_LDPC_N):            plain[n] = int((codeword[n] + tov[n][0] + tov[n][1] + tov[n][2]) > 0)            plain_sum += plain[n]         if plain_sum == 0:            min_errors = FTX_LDPC_M            break         if (errors := ldpc_check(plain)) < min_errors:            min_errors = errors            if errors == 0:                break         for m in range(FTX_LDPC_M):            for n_idx in range(FTX_LDPC_NUM_ROWS[m]):                n = FTX_LDPC_NM[m][n_idx] - 1                Tnm = codeword[n]                for m_idx in range(3):                    if (FTX_LDPC_MN[n][m_idx] - 1) != m:                        Tnm += tov[n][m_idx]                toc[m][n_idx] = fast_tanh(-Tnm / 2)         for n in range(FTX_LDPC_N):            for m_idx in range(3):                m = FTX_LDPC_MN[n][m_idx] - 1                Tmn = 1.0                for n_idx in range(FTX_LDPC_NUM_ROWS[m]):                    if (FTX_LDPC_NM[m][n_idx] - 1) != n:                        Tmn *= toc[m][n_idx]                tov[n][m_idx] = -2 * fast_atanh(Tmn)     return min_errors, plain   def ftx_normalize_logl(log174: typing.List[float]) -> typing.Generator[float, None, None]:    sum = 0    sum2 = 0    for it in log174:        sum += it        sum2 += it ** 2     inv_n = 1.0 / FTX_LDPC_N    variance = (sum2 - (sum * sum * inv_n)) * inv_n     norm_factor = math.sqrt(24.0 / variance)     for it in log174:        yield it * norm_factor 

Функция bp_decode реализует алгоритм Belief Propagation, в параметр codeword передается нормализованные функцией ftx_normalize_logl данные для декодирования; параметр max_iters определяет максимальное количество итераций на попытку раскодировать данные.

Результатом работы функции является кортеж, содержащий в себе счетчик ошибок и набор байт декодированных данных.

Матрица FTX_LDPC_NM содержит строки проверки четности, каждое значение — индекс в кодовом слове.

Матрица FTX_LDPC_MN описывает биты кодового слова, числа определяют какие три проверки четности из FTX_LDPC_NM относятся к кодовому слову.

Размеры матриц слишком большие чтобы полностью рассмотреть их в рамках статьи и доступны по ссылкам:

Примечание: рассматриваемый пример слабо оптимизирован и имеет высокую алгоритмическую сложность, на Python работает достаточно медленно на сильно зашумленных данных.

Транспортный уровень

На транспортном уровне в протоколах семейства FTX происходит перекодирование бит данных в код Грея и добавление данных матриц Костаса в качестве маркеров сигнала.

Код Грея

Код Грея (Gray code) — двоичный, зеркальный код, в котором две соседние комбинации отличаются одной цифрой (расстояние Хемминга между соседними комбинациями кода равно единице).

Пример записи кода Грея для размерностей 1, 2 и 3 приведен на рисунке 3, также можно заметить симметричность кода, из-за чего код и получил название «зеркальный код».

Рисунок 3: Коды Грея размерностей 1, 2 и 3, и его свойство зеркальности.

Рисунок 4: Круговой энкодер с кодом Грея и его развернутое представление.

На рисунке 4 приведен пример использования кода Грея в энкодере.

Более подробно про код Грея можно прочитать здесь.

Исходя из этих свойств, код Грея позволяет снизить вероятность ошибки при изменении значений без сигналов тактирования.

В протоколах семейства FTX код Грея также применяется для повышения помехоустойчивости, снижая вероятность ошибки на принимающей стороне при декодировании, таким образом, приемник сможет исправить ошибку, так как соседние значения будут отличаться она один бит.

Для FT8 используется трех битный код Грея, для FT4 — двух.

Алгоритм расчета кода Грея:

def bin_to_gray(b: int) -> int:    return b ^ (b >> 1)   for i in range(10):    print(f"{i}, {i:5b}, {bin_to_gray(i):5b}") 

Результат расчета:

dec, bin,   gray  0, 00000, 00000  1, 00001, 00001  2, 00010, 00011  3, 00011, 00010  4, 00100, 00110  5, 00101, 00111  6, 00110, 00101  7, 00111, 00100  8, 01000, 01100  9, 01001, 01101 

Перевод в код Грея

Исходя из таблицы, коды Грея можно представить в виде списков, где значения индексов списка будут совпадать с представлением числа в формате Грея:

FT8_GRAY_MAP = [0, 1, 3, 2, 5, 6, 4, 7] FT4_GRAY_MAP = [0, 1, 3, 2] 

Таким образом, задача перевода данных из двоичного кода в код Грея реализуется кодом:

mask = 0x80 i_byte = 0 bits3 = 0   for bit_or in [4, 2, 1]:    if codeword[i_byte] & mask:        bits3 |= bit_or     mask >>= 1    if mask == 0:        mask = 0x80        i_byte += 1  yield FT8_GRAY_MAP[bits3] 
mask = 0x80 i_byte = 0 bits2 = 0   for bit_or in [2, 1]:    if codeword[i_byte] & mask:        bits2 |= bit_or     mask >>= 1    if mask == 0:        mask = 0x80        i_byte += 1  yield FT4_GRAY_MAP[bits2] 

Где bits3 и bits2 — биты исходного сообщения codeword.

Перевод из кода Грея

Декодирование в протоколах семейства FTX осуществляется перебором, путем выделения из сигнала тех бит, амплитуды которых в сигнале максимальны, они, в свою очередь, дадут представление о битах в двоичной форме.

def ft4_extract_symbol(self, mag_idx: int) -> typing.Tuple[float, float]:    s2 = [self.wf.mag[mag_idx + gc] for gc in FT4_GRAY_MAP]     logl_0 = max(s2[2], s2[3]) - max(s2[0], s2[1])    logl_1 = max(s2[1], s2[3]) - max(s2[0], s2[2])    return logl_0, logl_1   def ft8_extract_symbol(self, mag_idx: int) -> typing.Tuple[float, float, float]:    s2 = [self.wf.mag[mag_idx + gc] for gc in FT8_GRAY_MAP]     logl_0 = max(s2[4], s2[5], s2[6], s2[7]) - max(s2[0], s2[1], s2[2], s2[3])    logl_1 = max(s2[2], s2[3], s2[6], s2[7]) - max(s2[0], s2[1], s2[4], s2[5])    logl_2 = max(s2[1], s2[3], s2[5], s2[7]) - max(s2[0], s2[2], s2[4], s2[6])    return logl_0, logl_1, logl_2 

Функции ft4_extract_symbol и ft8_extract_symbol принадлежат классу Monitor, инкапсулирующего в себе логику анализа и декодирования данных (данный тезис будет более понятен при чтении статьи от конца к началу). Параметр mag_idx — номер корзины в спектре FFT.

Результат работы функции — кортеж из амплитудных значений бит данных.

Массив Костаса

Массив Костаса (Costas array) — тип решетки точек n*n, в которой ни одна пара точек, не имеет одинакового вектора смещения; иными словами в массиве существует только одна точка в строке и одна точка в столбце и расстояние между любыми двумя парами точек уникально, тем самым, два массива Костаса с одинаковыми значениями после операций параллельного переноса и транспонирования могут быть взаимно симметричны только тогда, когда полностью совпадают расположения точек в обоих массивах.

Исходя из этого, автокорреляционная функция (АКФ) по точкам из массива Костаса стремится к нулю, тем самым минимизируя ложные совпадения и взаимные помехи.

Рисунок 5: Примеры сигналов и графики их АКФ.

На рисунке 5 приведены примеры функций и графики их АКФ; при высоком уровне автокорреляций, огибающая графика имеет плавный рост и убывание на всей длине сигнала (рисунок 5, график A), а при низкой автокорреляции заметного роста не происходит, но, когда сигнал совпадает сам с собой, возникает резкий всплеск, например, в случае с белым шумом (рисунок 5, график D), другими словами при высокой автокорреляции функции градиент плавный, при низкой — резкий.

В протоколах семейства FTX массивы костаса используются в качестве маркеров синхронизации, решая задачу выделения полезных сигналов при наличии интерференций и шума.

Принцип применения массива Костаса можно рассмотреть на примере.

Допустим, имеется диагональная матрица из светодиодов и непрозрачная маска с отверстиями, которые расположены на тех же местах, что и светодиоды (рисунок 6), таким образом, перемещая маску по вертикали и по горизонтали можно добиться полного совпадения отверстий в маске со светодиодами (рисунок 7.2); однако, если один из светодиодов будет погашен (тем самым имитируя частичную потерю или искажение данных), то при такой конфигурации маски и светодиодов возникает неопределенность, когда невозможно точно определить находится ли найденный массив в начале или конце маски (рисунок 7.1).

Рисунок 6: Маска с отверстиями и диодная матрица.

Рисунок 7: 1: Состояние неопределенности; 2: Поиск диодов по маске.

Конфигурация маски и светодиодной матрицы с расположением отверстий и светодиодов согласно массиву Костаса 3*3 решает проблему неопределенности (рисунок 8), таким образом, если один из светодиодов будет выключен, то оставшихся двух будет достаточно для выравнивания маски, избегая состояния неопределенности.

Рисунок 8: маска и светодиодная матрица сконфигурированные по массиву Костаса.

Использование массива Костаса также обеспечивает точность позиционирования, если данные скомпрометированы, например один из светодиодов будет расположен значительно выше или ниже ожидаемого положения.

При использовании массива Костаса 3*3 состояние неопределенности наступит вновь, когда будут отключены два или более светодиодов.

Исходя из описанных свойств и обеспечения точности синхронизации сигналов во времени и частоте, массивы Костаса нашли применение в гидро и радиолокации.

Внедрение массива Костаса в сигнал

В протоколе FT8 используется массив Костаса 7*7, со значениями [3, 1, 4, 0, 6, 5, 2] и записывается в начало, середину и конец сообщения.

В отличие от FT8, протокол FT4 использует уже 4 массива Костаса 4*4 со значениями: [[0, 1, 3, 2], [1, 0, 2, 3], [2, 3, 1, 0], [3, 2, 0, 1]].

Примечание: причина выбора массива Костаса со значениями [3, 1, 4, 0, 6, 5, 2] до конца не понятна, но возможно это отсылка, по причине схожести первых трех чисел из числа пи.

FT8_NN = 79 FT4_NN = 105   FT8_COSTAS_PATTERN = [3, 1, 4, 0, 6, 5, 2] FT4_COSTAS_PATTERN = [    [0, 1, 3, 2],    [1, 0, 2, 3],    [2, 3, 1, 0],    [3, 2, 0, 1] ]   FT4_XOR_SEQUENCE = [    0x4A,  # 01001010    0x5E,  # 01011110    0x89,  # 10001001    0xB4,  # 10110100    0xB0,  # 10110000    0x8A,  # 10001010    0x79,  # 01111001    0x55,  # 01010101    0xBE,  # 10111110    0x28,  # 00101 [000] ]   def ft8_encode(payload: typing.ByteString) -> typing.Generator[int, None, None]:    a91 = ftx_add_crc(payload)    codeword = ftx_encode(a91)     mask = 0x80    i_byte = 0    for i_tone in range(FT8_NN):        if 7 > i_tone >= 0:            yield FT8_COSTAS_PATTERN[i_tone]        elif 43 > i_tone >= 36:            yield FT8_COSTAS_PATTERN[i_tone - 36]        elif 79 > i_tone >= 72:            yield FT8_COSTAS_PATTERN[i_tone - 72]        else:            bits3 = 0            for bit_or in [4, 2, 1]:                if codeword[i_byte] & mask:                    bits3 |= bit_or                 mask >>= 1                if mask == 0:                    mask = 0x80                    i_byte += 1             yield FT8_GRAY_MAP[bits3]   def ft4_encode(payload: typing.ByteString) -> typing.Generator[int, None, None]:    payload_xor = bytearray(b"\x00" * 10)    for i in range(10):        payload_xor[i] = payload[i] ^ FT4_XOR_SEQUENCE[i]     a91 = ftx_add_crc(payload_xor)    codeword = ftx_encode(a91)     mask = 0x80    i_byte = 0    for i_tone in range(FT4_NN):        if i_tone == 0 or i_tone == 104:            yield 0        elif 5 > i_tone >= 1:            yield FT4_COSTAS_PATTERN[0][i_tone - 1]        elif 38 > i_tone >= 34:            yield FT4_COSTAS_PATTERN[1][i_tone - 34]        elif 71 > i_tone >= 67:            yield FT4_COSTAS_PATTERN[2][i_tone - 67]        elif 104 > i_tone >= 100:            yield FT4_COSTAS_PATTERN[3][i_tone - 100]        else:            bits2 = 0            for bit_or in [2, 1]:                if codeword[i_byte] & mask:                    bits2 |= bit_or                 mask >>= 1                if mask == 0:                    mask = 0x80                    i_byte += 1             yield FT4_GRAY_MAP[bits2] 

Генераторная функция ft8_encode вызывает ранее рассмотренные функции вычисления контрольных сумм и LDPC кодирования, после чего формирует последовательность по схеме: S7 D29 S7 D29 S7, где S — данные массива Костаса, D — данные сообщения после ранее преобразованные в код Грея. Выходными данными функции являются номера GFSK-тонов.

Пример данных, сформированные функцией ft8_encode: 3140652 00000000100651431071150732373 3140652 35427373324062650244263575260 3140652 (блоки данных и массивы Костаса разделены пробелами).

Работа генераторной функции ft4_encode аналогична работе функции ft8_encode, с той лишь разницей, что кодируемые данные предварительно подвергаются операции XOR в соответствии со схемой FT4_XOR_SEQUENCE, такое повышение энтропии необходимо для того, чтобы свести к минимуму ситуацию, когда в сообщении идет друг за другом большое количество нулей, что может негативно повлиять на помехоустойчивость.

Данные FT4 до и после операции XOR: 00000020587223930748 / 4a5e8994e8f85ac6b960.

Формирование результата осуществляется по схеме: R S4_1 D29 S4_2 D29 S4_3 D29 S4_4 R, где R — ramping symbol, равный 0.

Пример данных, сформированные функцией ft4_encode: 0 0132 10331123303131102330223011332 1023 01332311302112123233233113233 2310 30302303033330213121320010313 3201 0 (блоки данных и массивы Костаса разделены пробелами)

FT8_NN, FT4_NN — общее количество тонов в последовательности сигнала FT8/FT4.

Поиск сигнала по массиву Костаса

В протоколах семейства FTX при детекции сигналов в общей полосе производится обход спектра временного окна и скоринг сигнала по амплитудам частот, расположенных согласно значениям из массива Костаса. При достижении порогового значения скоринга, полоса частот во временном промежутке принимается в расчет для последующего декодирования сигнала.

@dataclass class Candidate:    time_offset: int    freq_offset: int    time_sub: int    freq_sub: int    score: int = 0 

Класс Candidate для хранения информации о положении сигнала и его скор-балла в общем сигнале.

time_offset — индекс блока времени;

freq_offset — индекс блока частот;

time_sub — подиндекс блока времени при оверсемплинге;

freq_sub — подиндекс блока частот при оверсемплинге.

def get_cand_mag_idx(self, candidate: Candidate) -> int:    wf = self.wf     offset = candidate.time_offset    offset = offset * wf.time_osr + candidate.time_sub    offset = offset * wf.freq_osr + candidate.freq_sub    offset = offset * wf.num_bins + candidate.freq_offset     return offset   def ft8_sync_score(self, candidate: Candidate) -> int:    wf = self.wf     score = 0    num_average = 0     mag_cand = self.get_cand_mag_idx(candidate)     for m in range(FT8_NUM_SYNC):        for k in range(FT8_LENGTH_SYNC):            block = FT8_SYNC_OFFSET * m + k            block_abs = candidate.time_offset + block            if block_abs < 0:                continue            if block_abs >= wf.num_blocks:                break             p8 = mag_cand + block * wf.block_stride            sm = FT8_COSTAS_PATTERN[k]            p8sm = p8 + sm            if sm > 0:                score += wf.mag[p8sm] - wf.mag[p8sm - 1]                num_average += 1            if sm < 7:                score += wf.mag[p8sm] - wf.mag[p8sm + 1]                num_average += 1            if k > 0 and block_abs > 0:                score += wf.mag[p8sm] - wf.mag[p8sm - wf.block_stride]                num_average += 1            if k + 1 < FT8_LENGTH_SYNC and block_abs + 1 < wf.num_blocks:                score += wf.mag[p8sm] - wf.mag[p8sm + wf.block_stride]                num_average += 1     if num_average > 0:        score = int(score / num_average)    return score   def ft4_sync_score(self, candidate: Candidate) -> int:    wf = self.wf    score = 0    num_average = 0     mag_cand = self.get_cand_mag_idx(candidate)     for m in range(FT4_NUM_SYNC):        for k in range(FT4_LENGTH_SYNC):            block = 1 + (FT4_SYNC_OFFSET * m) + k            block_abs = candidate.time_offset + block             if block_abs < 0:                continue            if block_abs >= wf.num_blocks:                break             p4 = mag_cand + (block * wf.block_stride)            sm = FT4_COSTAS_PATTERN[m][k]            p4sm = p4 + sm             if sm > 0:                score += wf.mag[p4sm] - wf.mag[p4sm - 1]                num_average += 1            if sm < 3:                score += wf.mag[p4sm] - wf.mag[p4sm + 1]                num_average += 1            if k > 0 and block_abs > 0:                score += wf.mag[p4sm] - wf.mag[p4sm - wf.block_stride]                num_average += 1            if k + 1 < FT4_LENGTH_SYNC and block_abs + 1 < wf.num_blocks:                score += wf.mag[p4sm] - wf.mag[p4sm + wf.block_stride]                num_average += 1     if num_average > 0:        score = int(score / num_average)    return score   def ftx_sync_score(self, candidate: Candidate) -> int:    wf = self.wf     if wf.protocol == FTX_PROTOCOL_FT4:        sync_fun = self.ft4_sync_score    elif wf.protocol == FTX_PROTOCOL_FT8:        sync_fun = self.ft8_sync_score    else:        raise ValueError("Invalid protocol")     return sync_fun(candidate)   def ftx_find_candidates(self, num_candidates: int, min_score: int) -> typing.List[Candidate]:    wf = self.wf     num_tones = FTX_TONES_COUNT[wf.protocol]    time_offset_range = range(-FTX_LENGTH_SYNC[wf.protocol], int(FTX_TIME_RANGE[wf.protocol]))     heap = []    can = Candidate(0, 0, 0, 0)    for time_sub in range(wf.time_osr):        for freq_sub in range(wf.freq_osr):            for time_offset in time_offset_range:                for freq_offset in range(wf.num_bins - num_tones):                    can.time_sub = time_sub                    can.freq_sub = freq_sub                    can.time_offset = time_offset                    can.freq_offset = freq_offset                     if (score := self.ftx_sync_score(can)) < min_score:                        continue                     candidate = copy(can)                    candidate.score = score                     heap.insert(0, candidate)     heap.sort(key=lambda x: x.score, reverse=True)    return heap[:num_candidates] 

Вспомогательная функция get_cand_mag_idx переводит параметры частоты и времени кандидата в позицию в матрице амплитуд сигнала.

Функция ftx_find_candidates осуществляет подбор и скоринг кандидатов, перебирая частоты и временные временные отметки, в которых потенциально может быть обнаружен сигнал. Кандидат передается в функцию ftx_sync_score для анализа, если удается обнаружить признаки сигналов по массиву Костаса, то значение скор-балла увеличивается.

Результатом работы функции является список кандидатов (heap), отсортированный по признаку score, исключая кандидатов, чей скор-балл ниже значения в min_score; размер списка ограничен размером num_candidates.

Функция ftx_sync_score, в зависимости от протокола, FT8 или FT4, передает управление функциям ft4_sync_score и ft8_sync_score, которые в свою очередь осуществляют анализ сигнала на наличие маркеров-массивов Костаса.

Функции ft8_sync_score и ft4_sync_score реализуют обход участка частот кандидата, согласно расположениям массивов Костаса (позиции 0-7, 36-43, 72-79 для FT8 и 1-4, 34-37, 67-70, 100-103 для FT4). В значении score суммируется разность амплитуд между искомыми и соседними частотами, иными словами, чем резче градиент амплитуд в полосе, тем выше скор-балл кандидата. Переменная wf.mag (ссылка на self.wf) — матрица амплитуд (WFFT) принятого сигнала.

Канальный уровень

На канальном уровне происходит синтезирование сигнала, на основе данных, сформированным на предыдущем уровне. В результате формируется дискретный сигнал, готовый для передачи по аналоговому (звуковому) каналу связи.

При приеме сигнала, на этом уровне осуществляется оконное преобразование Фурье и вычисление амплитуд.

FSK и GFSK формирование сигнала

FSK (Frequency Shift Keying) — вид модуляции (манипуляции), при котором информация кодируется изменением несущей частоты сигнала. Для кодирования информации может использоваться две или более частот, где каждому состоянию соответствует та или иная частота.

На рисунке 9 изображено представление сигнала с использованием FSK для бинарных данных, в частности единицам соответствует высокая частота, а нулям меньшая.

Рисунок 9: FSK для двоичных данных и вид сигнала.

Стоит обратить внимание на то, что в рассматриваемом примере фаза непрерывна, то есть не имеет скачков.

GFSK (Gaussian Frequency-Shift Keying) — подвид FSK манипуляции при которой используется фильтр Гаусса (рисунок 10) для сглаживания частотных перестроек.

Рисунок 10: Импульсная переходная функция фильтра Гаусса.

Принцип модуляции GFSK идентичен обычному FSK, с той разницей, что формируемые импульсы тонов предварительно проходят через фильтр Гаусса для сглаживания (плавно перестраивает фазу сигнала в начале и конце тона), что позволяет уменьшить ширину спектра сигнала, так как резкая перестройка частоты на спектрограмме выглядит как широкополосный выброс (рисунок 13).

Рисунок 11: Сравнение FSK и GFSK.

На рисунках 11 и 12 показано сравнение сигналов и спектров обычного FSK и того же сигнала с использовании фильтра Гаусса (GFSK).

Фильтр Гаусса описывается уравнением 1.

(1)

Где

, а BT — коэффициент сглаживания,чем он больше, тем меньше сглаживание исходного импульса (рисунок 12).

Рисунок 12: Сравнение сглаживаний при изменении параметра BT.

Рисунок 13: Сравнение спектров FSK и GFSK сигналов.

В протокол FT8 используется ширина сглаживания BT равная 2.0, в протоколе FT4 — 1.0.

import math import typing   FT8_SYMBOL_PERIOD = 0.160 FT4_SYMBOL_PERIOD = 0.048   FT8_SYMBOL_BT = 2.0 FT4_SYMBOL_BT = 1.0   GFSK_K = math.pi * math.sqrt(2 / math.log(2))   def gfsk_pulse(n_spsym: int, symbol_bt: float) -> typing.Generator[float, None, None]:    for i in range(3 * n_spsym):        t = i / n_spsym - 1.5        arg1 = GFSK_K * symbol_bt * (t + 0.5)        arg2 = GFSK_K * symbol_bt * (t - 0.5)        val = (math.erf(arg1) - math.erf(arg2)) / 2         yield val   def synth_gfsk(symbols: typing.List[int], n_sym: int,               f0: float,               symbol_bt: float, symbol_period: float,               signal_rate: int) -> typing.Generator[float, None, None]:    n_spsym = int(0.5 + signal_rate * symbol_period)    n_wave = n_sym * n_spsym    hmod = 1.0     dphi_peak = 2 * math.pi * hmod / n_spsym    dphi = [2 * math.pi * f0 / signal_rate] * (n_wave + 2 * n_spsym)     pulse = list(gfsk_pulse(n_spsym, symbol_bt))     for i in range(n_sym):        ib = i * n_spsym        for j in range(3 * n_spsym):            dphi[j + ib] += dphi_peak * symbols[i] * pulse[j]     for j in range(2 * n_spsym):        dphi[j] += dphi_peak * pulse[j + n_spsym] * symbols[0]        dphi[j + n_sym * n_spsym] += dphi_peak * pulse[j] * symbols[n_sym - 1]     phi = 0.0    n_ramp = n_spsym // 8    for k in range(n_wave):        val = math.sin(phi)        phi = math.fmod(phi + dphi[k + n_spsym], 2 * math.pi)         if k < n_ramp or k >= n_wave - n_ramp:            i_ramp = (k if k < n_ramp else n_wave - k - 1)            env = (1 - math.cos(2 * math.pi * i_ramp / (2 * n_ramp))) / 2            val *= env         yield val 

Генераторная функция gfsk_pulse рассчитывает значения сглаживающего импульса.

Параметр n_spsym — количество семплов на символ; параметр symbol_bt — крутизна сглаживания (параметр BT); результат — генератор со значениями типа float.

Генераторная функция synth_gfsk формирует дискретный сигнал, с частотой дискретизации задаваемой параметром signal_rate, на вход передается список тонов в параметре symbols; параметр n_sym определяет размер множества символов (значение FT4_NN/FT8_NN); параметр f0 определяет базовую частоту, относительно которой будет будут рассчитываться частоты тонов; symbol_bt — крутизна фильтра Гаусса (FT4_SYMBOL_BT/FT8_SYMBOL_BT); параметр symbol_period определяет длительность звучания каждого тона в секундах (FT4_SYMBOL_PERIOD/FT8_SYMBOL_PERIOD); функция генерирует последовательность дискретных значений от -1 до 1 используя функцию синус (sin), в качестве аргумента которого передается рассчитанная фаза сигнала.

pulse = list(gfsk_pulse(n_spsym, symbol_bt)) 

В pulse формируется список дискретных значений сглаживающей функции Гаусса.

При формировании сигнала дополнительно происходит сглаживание амплитуд в начале и конце сигнала оконной функцией Ханна:

if k < n_ramp or k >= n_wave - n_ramp:    i_ramp = (k if k < n_ramp else n_wave - k - 1)    env = (1 - math.cos(2 * math.pi * i_ramp / (2 * n_ramp))) / 2    val *= env 

Таким образом, результатом работы функций канального уровня, является дискретный звуковой сигнал, готовый к отправке в ЦАП передатчика.

Общий вид кодера и генератора сигнала FT8/FT4:

import numpy as np from scipy.io.wavfile import write   FT4_SLOT_TIME = 7.5 FT8_SLOT_TIME = 15.0   def main():    try:        call = "CQ R1ABC KO85"        payload = ftx_message_encode_std(*call.split())    except Exception as e:        print(f"Cannot parse message: {type(e)}")        return     print("Payload", ''.join('{:02x}'.format(x) for x in payload))     is_ft4 = False    if is_ft4:        tones = ft4_encode(payload)    else:        tones = ft8_encode(payload)     tones = list(tones)     print("FSK tones:", "".join(str(i) for i in tones))     frequency = 1000     symbol_period = FT4_SYMBOL_PERIOD if is_ft4 else FT8_SYMBOL_PERIOD    symbol_bt = FT4_SYMBOL_BT if is_ft4 else FT8_SYMBOL_BT     sample_rate = 12000    num_tones = FT4_NN if is_ft4 else FT8_NN    slot_time = FT4_SLOT_TIME if is_ft4 else FT8_SLOT_TIME     num_samples = int(0.5 + num_tones * symbol_period * sample_rate)    num_silence = int((slot_time * sample_rate - num_samples) / 2)     signal = np.fromiter(synth_gfsk(tones, num_tones, frequency, symbol_bt, symbol_period, sample_rate), dtype=float)    silence = np.zeros(num_silence)    amplitude = np.iinfo(np.int16).max    data = np.concat([silence, amplitude * signal, silence])    write("examples/signal.wav", sample_rate, data.astype(np.int16))   if __name__ == '__main__':    main() 

Приведенный код формирует из строки «CQ R1ABC KO85» осуществляет генерацию сигнала протокола FT8 и сохраняет сигнал в файл с именем signal.wav.

Изменение значения переменной is_ft4 на True изменит логику генератора на формирование сигнала протокола FT4.

Результат вывода данных для FT8:

Payload 00000020587223930748 FSK tones: 3140652000000001006514310711507323733140652354273733240626502442635752603140652 

Результат вывода данных для FT4:

Payload 00000020587223930748 FSK tones: 001321033112330313110233022301133210230133231130211212323323311323323103030230303333021312132001031332010 

Декодирование сигнала

При декодировании входящий сигнал разделяется на блоки, размером в период символа протокола (для FT8 0.160 и FT4 0.048 сек), суммарной продолжительностью во временной слот протокола (15.0 и 7.5 секунд для FT8 и FT4 соответственно); далее каждый блок передается в оконное преобразование Фурье (WFFT, рисунок 14) с использованием оконной функции (например Ханна, рисунок 15 и формула 2), магнитуды частот преобразуются в амплитуды в децибелах.

Рисунок 14: Представление преобразования Фурье.

Рисунок 15: График оконной функции Ханна.

(2)

Дальнейшая обработка сигнала заключается в поиске сигналов-кандидатов (поиск по массиву Костаса), нормализация амплитуд сигнала кандидата и передача в LDPC-декодер (алгоритм Belief Propagation), то есть вверх по иерархии уровней абстрагирования.

Логику обработки сигнала можно инкапсулировать в класс Monitor:

kMin_score = 5 kMax_candidates = 140 kLDPC_iterations = 25 kMaxLDPCErrors = 32   @dataclass class Waterfall:    num_bins: int    time_osr: int    freq_osr: int    protocol: int    mag = typing.List[int]    max_blocks: int     num_blocks: int = 0    block_stride: int = 0     def __post_init__(self):        self.block_stride = (self.time_osr * self.freq_osr * self.num_bins)        self.mag = [0] * (self.max_blocks * self.time_osr * self.freq_osr * self.num_bins)   class Monitor:    @staticmethod    def hann_i(i: int, N: int) -> float:        x = math.sin(math.pi * i / N)        return x ** 2      @staticmethod    def ftx_normalize_logl(log174: typing.List[float]) -> typing.Generator[float, None, None]:        ...      def __init__(self, f_min: int, f_max: int, sample_rate: int, time_osr: int, freq_osr: int, protocol):        slot_time = FTX_SLOT_TIMES[protocol]        symbol_period = FTX_SYMBOL_PERIODS[protocol]         self.block_size = int(sample_rate * symbol_period)        self.subblock_size = int(self.block_size / time_osr)        self.nfft = self.block_size * freq_osr        self.fft_norm = 2.0 / self.nfft         self.window = [self.fft_norm * self.hann_i(i, self.nfft) for i in range(self.nfft)]        self.last_frame = [0.0] * self.nfft         max_blocks = int(slot_time / symbol_period)         self.min_bin = int(f_min * symbol_period)        self.max_bin = int(f_max * symbol_period + 1)        num_bins = self.max_bin - self.min_bin         self.wf = Waterfall(max_blocks=max_blocks, num_bins=num_bins, time_osr=time_osr, freq_osr=freq_osr,                            protocol=protocol)         self.symbol_period = symbol_period         self.max_mag = -120.0      def monitor_process(self, frame: typing.List[float]):        if self.wf.num_blocks >= self.wf.max_blocks:            return False         offset = self.wf.num_blocks * self.wf.block_stride        frame_pos = 0         for time_sub in range(self.wf.time_osr):            for pos in range(self.nfft - self.subblock_size):                self.last_frame[pos] = self.last_frame[pos + self.subblock_size]             for pos in range(self.nfft - self.subblock_size, self.nfft):                self.last_frame[pos] = frame[frame_pos]                frame_pos += 1             timedata = [self.window[pos] * self.last_frame[pos] for pos in range(self.nfft)]            freqdata = np.fft.fft(timedata)[:self.nfft // 2 + 1]             for freq_sub in range(self.wf.freq_osr):                for bin in range(self.min_bin, self.max_bin):                    src_bin = (bin * self.wf.freq_osr) + freq_sub                    mag2 = freqdata[src_bin].imag ** 2 + freqdata[src_bin].real ** 2                    db = 10.0 * math.log10(1E-12 + mag2)                     scaled = int(2 * db + 240)                    self.wf.mag[offset] = max(min(scaled, 255), 0)                    offset += 1                     self.max_mag = max(self.max_mag, db)         self.wf.num_blocks += 1        return True      def ft8_sync_score(self, candidate: Candidate) -> int:        ...      def ft4_sync_score(self, candidate: Candidate) -> int:        ...      def ftx_sync_score(self, candidate: Candidate) -> int:        ...      def ftx_find_candidates(self, num_candidates: int, min_score: int) -> typing.List[Candidate]:        ...      def get_cand_mag_idx(self, candidate: Candidate) -> int:        ...      def ft4_extract_likelihood(self, cand: Candidate) -> typing.List[float]:        log174 = [0.0] * FTX_LDPC_N         mag = self.get_cand_mag_idx(cand)        for k in range(FT4_ND):            sym_idx = k + (5 if k < 29 else 9 if k < 58 else 13)            bit_idx = 2 * k             block = cand.time_offset + sym_idx            if block < 0 or block >= self.wf.num_blocks:                log174[bit_idx + 0] = 0                log174[bit_idx + 1] = 0            else:                logl_0, logl_1 = self.ft4_extract_symbol(mag + sym_idx * self.wf.block_stride)                log174[bit_idx + 0] = logl_0                log174[bit_idx + 1] = logl_1         return log174      def ft8_extract_likelihood(self, cand: Candidate) -> typing.List[float]:        log174 = [0.0] * FTX_LDPC_N         mag = self.get_cand_mag_idx(cand)        for k in range(FT8_ND):            sym_idx = k + (7 if k < 29 else 14)            bit_idx = 3 * k             block = cand.time_offset + sym_idx            if block < 0 or block >= self.wf.num_blocks:                log174[bit_idx + 0] = 0                log174[bit_idx + 1] = 0                log174[bit_idx + 2] = 0            else:                logl_0, logl_1, logl_2 = self.ft8_extract_symbol(mag + sym_idx * self.wf.block_stride)                log174[bit_idx + 0] = logl_0                log174[bit_idx + 1] = logl_1                log174[bit_idx + 2] = logl_2         return log174      def ft4_extract_symbol(self, mag_idx: int) -> typing.Tuple[float, float]:     ...      def ft8_extract_symbol(self, mag_idx: int) -> typing.Tuple[float, float, float]:     ...      def ftx_decode_candidate(            self, cand: Candidate,            max_iterations: int) -> typing.Optional[typing.Tuple[DecodeStatus, typing.Optional[bytes], float]]:        wf = self.wf         if wf.protocol == FTX_PROTOCOL_FT4:            log174 = self.ft4_extract_likelihood(cand)        else:            log174 = self.ft8_extract_likelihood(cand)         log174 = list(self.ftx_normalize_logl(log174))        ldpc_errors, plain174 = bp_decode(log174, max_iterations)         if ldpc_errors > kMaxLDPCErrors:            return None        if not ftx_check_crc(plain174):            return None         a91 = self.pack_bits(plain174, FTX_LDPC_K)        crc_extracted = ftx_extract_crc(a91)         if wf.protocol == FTX_PROTOCOL_FT4:            payload = bytearray(a91[i] ^ xor for i, xor in enumerate(FT4_XOR_SEQUENCE))            tones = ft4_encode(payload)        else:            payload = a91            tones = ft8_encode(payload)         snr = self.ftx_subtract(cand, tones)        return DecodeStatus(ldpc_errors, crc_extracted), payload, snr      def decode(self) -> typing.Generator[typing.Tuple[float, float, float, str], None, None]:        hashes = set()        wf = self.wf         candidate_list = self.ftx_find_candidates(kMax_candidates, kMin_score)        for cand in candidate_list:            freq_hz = (self.min_bin + cand.freq_offset + cand.freq_sub / wf.freq_osr) / self.symbol_period            time_sec = (cand.time_offset + cand.time_sub / wf.time_osr) * self.symbol_period - 0.65             if not (x := self.ftx_decode_candidate(cand, kLDPC_iterations)):                continue             status, message, snr = x             if (crc := status.crc_extracted) in hashes:                continue             hashes.add(crc)             call_to_rx, call_de_rx, extra_rx = ftx_message_decode(message)             yield snr, time_sec, freq_hz, " ".join([call_to_rx, call_de_rx or "", extra_rx or ""]) 

Генераторная функция decode инкапсулирует логику подбора кандидатов и их декодирование ранее описанными функциями.

Функция выдает кортеж с данными о SNR, отклонении времени и декодированные сообщения в текстовом виде.

Функция ftx_decode_candidate принимает на вход сигнал-кандидат, вызывает методы ft4_extract_likelihood/ft8_extract_likelihood; данные извлеченные из сигнала, нормализованные функцией ftx_normalize_logl, передаются в LPDC-декодер (bp_decode), после чего проверяется CRC-14 (ftx_check_crc) и на основе декодированных данных декодируется сообщение.

Функции ft8_extract_likelihood и ft4_extract_likelihood извлекают биты данны из общего скопа сигналов, формируя список «аналоговых» бит.

Функция monitor_process реализует аккумулятор сигнала, принимая на вход блоки принятого дискретного сигнала, производя над ними преобразование Фурье и сохраняет результаты в «водопад» (структура Waterfall).

Параметрами конструктора класса Monitor являются ширина полосы частот принимаемого сигнала (f_min, f_max), частота дискретизации (sample_rate), параметры оверсемплинга (time_osr, freq_osr) и идентификатор протокола FT8/FT4.

Полный пример декодера с использованием класса Monitor:

import time import numpy as np from scipy.io.wavfile import read from consts import FTX_PROTOCOL_FT8, FTX_PROTOCOL_FT4 from decode import Monitor   kFreq_osr = 2 kTime_osr = 4   def main():    is_ft4 = False     sample_rate, data = read("examples/signal.wav")     amplitude = np.iinfo(data.dtype).max    signal = data / amplitude     protocol = FTX_PROTOCOL_FT4 if is_ft4 else FTX_PROTOCOL_FT8     mon = Monitor(        f_min=200,        f_max=3000,        sample_rate=sample_rate,        time_osr=kTime_osr,        freq_osr=kFreq_osr,        protocol=protocol    )     frame_pos = 0    while True:        eof = frame_pos >= len(signal) - mon.block_size        if eof or not mon.monitor_process(signal[frame_pos:frame_pos + mon.block_size]):            print(f"Waterfall accumulated {mon.wf.num_blocks} symbols")            print(f"Max magnitude: {mon.max_mag:+.2f} dB")             tm_slot_start = 0            ts1 = time.monotonic()            for i, (snr, time_sec, freq_hz, text) in enumerate(mon.decode(tm_slot_start)):                print(                    f"{i + 1:03}\t"                    f"{snr:+06.2f}dB\t"                    f"{time_sec:-.2f}sec\t"                    f"{freq_hz:.2f}Hz\t"                    f"{text}"                )             mon.wf.num_blocks = 0            mon.max_mag = -120.0             ts2 = time.monotonic()            print("-" * 20, "decoded @", ts2 - ts1, "sec")         if eof:            break         frame_pos += mon.block_size   if __name__ == '__main__':    main() 

На рисунках 16 и 17 приведены спектрограмма формируемых сигналов для FT8 и FT4.

Рисунок 16: Спектрограмма сигнала FT8.

Рисунок 17: Спектрограмма сигнала FT4.

Расчет SNR

Один из способов вычисления SNR — определить амплитуды сигнала, исключить из полосы сигнала сам сигнал и просуммировать амплитуды в его окрестностях.

from copy import copy from itertools import cycle   def ftx_subtract(self, candidate: Candidate, tones: typing.Iterable[int]) -> float:    num_tones = FTX_TONES_COUNT[self.wf.protocol]     can = copy(candidate)    snr_all = 0.0     tones = cycle(tones)    for freq_sub in range(self.wf.freq_osr):        can.freq_sub = freq_sub         mag_cand = self.get_cand_mag_idx(can)        noise = 0.0        signal = 0.0        num_average = 0         for i, tone in enumerate(tones):            block_abs = candidate.time_offset + i            if block_abs < 0:                continue             if block_abs >= self.wf.num_blocks:                break             wf_el = mag_cand + i * self.wf.block_stride             noise_val = 100000.0            for s in filter(lambda x: x != tone, range(num_tones)):                noise_val = min(noise_val, self.wf.mag[wf_el + s] * 0.5 - 120.0)             noise += noise_val            signal += self.wf.mag[wf_el + tone] * 0.5 - 120.0            num_average += 1         noise /= num_average        signal /= num_average        snr = signal - noise         for i, tone in enumerate(tones):            block_abs = candidate.time_offset + i            if block_abs < 0:                continue            if block_abs >= self.wf.num_blocks:                break            wf_el = mag_cand + i * self.wf.block_stride            self.wf.mag[wf_el + tone] -= snr * 2 + 240         snr_all += snr     return snr_all / self.wf.freq_osr / 2 - 22 

Функция ftx_subtract (метод класса Monitor) перебирает тона исходного сигнала, вычисляет их амплитуды и амплитуды полосы сигнала, исключая амплитуды самих тонов. Функция возвращает результат деления уровня сигнала на уровень шумов. Побочным эффектом функции является удаление сигнала из водопада, что влияет на чувствительность декодирования соседних слабых сигналов.

Заключение

В статье были рассмотрены протоколы FT8 и FT4, их внутреннее устройство и принципы работы. Так, используя относительно простые и давно известные методы кодирования, реализуется достаточно помехоустойчивый протокол, позволяющий устанавливать дальние радиолюбительские связи. Также протоколы представляют интерес тем, что воплощают в себе принципы из таких областей как линейное кодирование и коррекция ошибок, радиолокация и цифровая обработка сигналов, частотная манипуляция, что также может являться предметом углубленного изучения.

Один из недостатков протоколов, подобных FT8/FT4, является тот факт, что радиообмен осуществляется в автоматическом режиме (в режиме модема), а полезными данными является только обмен метаданными в виде позывных, геолокации и уровнем SNR, хотя при этом протокол поддерживает обмен произвольными сообщениями очень малой длины и учитывая скорость обмена, работа в режиме чата достаточно затруднительная.

Так как архитектура протоколов представляет собой «матрешку» или «луковицу», технология может быть использована как базис для радиолюбительских экспериментов с передачей информации на большие расстояния.

Ссылки

  1. Исходный код реализации FT8/FT4 на Python

  2. Исходный код примера генератора сигнала FTX

  3. Исходный код примера декодера FTX

  4. Исходный код ft8_lib на C

  5. Статья с описанием протокола из журнала QEX

  6. Описание механизма синхронизации сигналов в FT8

  7. Программа WSJT, реализующая протоколы FTX


ссылка на оригинал статьи https://habr.com/ru/articles/928024/