Все об устройстве WSPR с примерами на Python (часть 2)

от автора

WSPR — цифровой протокол, разработанный Джо Тейлором (K1JT) в 2008-2009 годах, с целью исследования распространения радиосигналов от коротковолновых передатчиков малой и сверхмалой мощности. В предыдущей части были рассмотрены механизмы помехозащищенного кодирования данных и формирования сигнала для отправки его в эфир. В этой части статьи рассматриваются механизмы детектирования и декодирования принимаемого из сигнала.

Статья может быть интересна радиолюбителям, как знакомым, так и не знакомым с WSPR, а также тем, кто интересуется темой цифровой обработки сигналов и хочет понять устройство этого протокола.

Общее представление

В первой части были рассмотрены принципы и алгоритмы, задействованные в процессе формировании сигнала.

Рисунок 1: Общая схема протокола (абстракция по уровням OSI условная).

Рисунок 1: Общая схема протокола (абстракция по уровням OSI условная).

Процесс детекции и декодирования по своей сути диаметрально противоположен процессу кодирования. Обрабатываемый сигнал предварительно подвергается фильтрации даунсемплингу и фильтрации в частотной области, после чего осуществляется процесс синхронизации сигнала во временной области (для чего используются согласованные фильтры), что позволяет извлечь символы данных; извлеченные символы затем подвергаются декодированию с коррекцией ошибок, что уже позволяет получить на выходе передаваемое WSPR-маяком сообщение.

Перенос спектра и фильтрация

Входящий звуковой сигнал для обработки в WSPR поступает с частотой дискретизации 12.0 кГц, при этом центральная частота сигнала располагается в зоне 1.5 кГц; так как сигнал накапливается в течении 110 секунд, формируется достаточно большой объем звуковых данных, из которых только нужна полоса шириной self.sample_rate / WSPR_DECIMATION = 12000 / 32 = 375 Гц, всё что за пределами этой полосы — бесполезный шум.

Даунсемплинг и децимация сигнала в WSPR осуществляются в частотной области через прямое и обратное преобразование Фурье.

WSPR_DECIMATION = 32WSPR_CENTER_FREQ = 1500class WSPRMonitor(AbstractMonitor):    __slots__ = [        "sample_rate",        "signal",        ...    ]    def __init__(self, sample_rate: int):        self.signal = np.zeros(0, dtype=np.float64)        self.sample_rate = sample_rate        ...    def downsample(self) -> npt.NDArray[np.complex128]:        fft_target = 46080        fft_src = fft_target * WSPR_DECIMATION        frame = np.pad(self.signal[:fft_src], (0, max(0, fft_src - len(self.signal))))        spec = np.fft.rfft(frame)        df = self.sample_rate / fft_src        i0 = int(WSPR_CENTER_FREQ / df + 0.5)        sub_spec = spec[i0: i0 + fft_target]        signal = np.fft.ifft(sub_spec) * (fft_target / fft_src) * 2        return signal.astype(np.complex128)    ...    def monitor_process(self, frame: npt.NDArray[np.int16]) -> None:        frame_float = frame.astype(np.float32) / np.iinfo(np.int16).max        self.signal = np.concat([self.signal, frame_float])    ...

Поле sample_rate хранит информацию о частоте дискретизации исходного сигнала, в signal накапливаются данные самого сигнала.

Функция downsample осуществляет перенос спектра из области высоких частот в область низких, выполняя децимацию сигнала.

В переменную spec записывается результат преобразования Фурье для вещественной части сигнала.

Примечание: то есть в спектре содержатся только положительные частоты, область с отрицательными частотами зеркально симметрична и не используется (рисунок 2).

Рисунок 2: отличие FFT и RFFT

Рисунок 2: отличие FFT и RFFT

Переменная df определяет целевой частотный размер Фурье-бинов после преобразования.

Переменная i0 задает параметры фильтра в частотной области, — индекс частотного бина в исходном спектре, относительно которого формируется полоса пропускания; полоса пропускания ограничивается значением fft_target.

Массив данных в sub_spec — перенесенный и отфильтрованный по частоте спектр сигна. Переход из частотной во временную область осуществляется обратным преобразованием Фурье; сигнал дополнительно подвергается нормализации амплитуд (AGC).

При использовании обратного преобразования Фурье, полученный сигнал формируется в виде массива комплексных чисел, — то есть представляет собой сигнал в I/Q форме z(t) = I(t) + jQ(t) (что эквивалентно записи Эйлера в формате комплексной экспоненты s(t) = A(t) e^{j\phi(t)}).

Таким образом, перенос спектра и даунсемплинг позволяет осуществить децимацию без потерь, использовать меньшую частоту дискретизации, что значительно уменьшает объем данных, обработка которых потребует гораздо меньшего количества ресурсов и упрощает процесс анализа сигнала; также в частотной области происходит исключение высокочастотных и внеполосных шумов, что положительно влияет на SNR.

Детекция сигнала

Процесс декодирования сигнала начинается с его детекции. Функция decode получает  I/Q сигнал после даунсемплинга.

class WSPRMonitor(AbstractMonitor):    __slots__ = [        ...        "symbol_len",        "df",        "df2",        "dt",        "decode_passes",        ...    ]    ...    def __init__(self, sample_rate: int):        ...        down_sample_rate = self.sample_rate / WSPR_DECIMATION        self.symbol_len = int(down_sample_rate * WSPR_SYMBOL_PERIOD)        self.df = down_sample_rate / self.symbol_len        self.df2 = self.df / 2        self.dt = 1 / down_sample_rate        self.decode_passes = 3        ...    def decode(self, **kwargs) -> typing.Iterator[WSPRLogItem]:        iq_signal = self.downsample()        symbols = np.zeros(WSPR_NUM_BITS * 2, dtype=np.uint8)        fft_step = 128        fft_size = fft_step * 4        fft_count = 4 * (len(iq_signal) // fft_size) - 1        smooth = np.sin((np.pi / fft_size) * np.arange(fft_size))        pwr_spec = np.zeros((fft_count, fft_size))        block_size = 1        max_drift = 4        min_sync_2 = WSPR_MIN_SYNC_2        decodes_pass = 0        ...

Определяются параметры построения спектра I/Q сигнала — fft_step, fft_size, fft_count. Подготавливается сглаживающая функция для оконного преобразования Фурье.

Примечание: вместо окна Ханна Стив Франки (K9AN) применил положительный полупериод функции синус. В отличие от функции Ханна, функция синус чуть хуже подавляет боковые лепестки в спектре.

WSPR_MIN_SYNC_1 = 0.10WSPR_MIN_SYNC_2 = 0.12...

Константы WSPR_MIN_SYNC_1, WSPR_MIN_SYNC_2 определяют параметры минимального отклонения частоты при синхронизации.

Основной цикл обработки сигнала:

        ...        for iteration in range(self.decode_passes):            if iteration == 1 and decodes_pass == 0 and self.decode_passes > 2:                iteration = 2            if iteration == 2:                block_size = 4                max_drift = 0                min_sync_2 = WSPR_MIN_SYNC_1            decodes_pass = 0        ...

Усреднение спектра

В WSPR принимаемые сигналы могут иметь очень низкий показатель SNR, что делает их практически невидимыми. В WSPR для выделения слабых сигналов применяется алгоритм усреднения спектра накопленных данных. Так как шум в большинстве случаев является случайным процессом, с нулевым мат. ожиданием и не коррелирует друг с другом, а полезный сигнал находится на неизменной частоте и имеет неразрывную фазу, то при сложении спектров мощность сигнала будет расти линейно, в такое число раз, кратное количеству усреднений, в то время как спектр шума будет расти по закону случайных блужданий, — пропорционально квадратному корню из количества усреднений (\sqrt n).

На рисунке 3 приведен пример усреднения спектра сигнала в канале с аддитивным Гауссовским шумом при соотношении сигнал/шум SNR=-30 дБ. На первом графике длительность сигнала частотой 1.0 кГц (положение в спектре выделено красным пунктиром) составляет 1с, на второ — 10с и 30с на третьем соответственно. Как видно из графиков, при десятикратном усреднении спектра, полезный сигнал начинает проявляться относительно шума, то есть SNR начинает расти. При еще более долгом накоплении спектра сигнал начинает еще сильнее преобладать над шумом.

Рисунок 3: Выделение слабого сигнала при усреднении спектра.

Рисунок 3: Выделение слабого сигнала при усреднении спектра.
            ...            for i in range(fft_count):                start = i * fft_step                segment = iq_signal[start: start + fft_size]                if len(segment) < fft_size:                    segment = np.pad(segment, (0, fft_size - len(segment)), mode="constant")                cpx_spec = np.fft.fft(segment * smooth)                cpx_spec = np.fft.fftshift(cpx_spec)                pwr_spec[i, :] = np.abs(cpx_spec) ** 2            pwr_spec_avg = np.mean(pwr_spec, axis=0) * fft_count            center = fft_size // 2            span = 205            relevant_part = pwr_spec_avg[center - span: center + span + 1]            smooth_spec = np.convolve(relevant_part, WSPR_SMOOTH_KERNEL, mode="same")            noise_level = np.percentile(smooth_spec, WSPR_NOISE_PERCENTILE)            min_snr = np.pow(10.0, -8.0 / 10.0)            smooth_spec = smooth_spec / noise_level - 1.0            smooth_spec[smooth_spec < min_snr] = 0.1 * min_snr            ...

Исходя из того условия, что длительность одного символа WSPR составляет 0.6827 с, демодулятору, используя усреднение спектра, удается извлекать сигналы с низким значением SNR.

Поиск кандидатов

Следующий этап после усреднения спектра — анализ амплитуд в полученном спектре, заключающийся в нахождении локальных максимумов спектра. Каждый найденный локальный максимум является признаком потенциального искомого сигнала.

WSPR_SNR_SCALING_FACTOR = 26.3...def dB(x: float) -> float:    val = -99.0    if x > 1.259e-10:        x = max(x, 0.000001)        val = 10.0 * np.log10(x)    return val...@dataclassclass Candidate:    freq: float    snr: float    shift: int = 0    drift: float = 0    sync: float = 0...class WSPRMonitor(AbstractMonitor):    ...    MAX_CANDIDATES = 410    CANDIDATE_FREQ_MIN = -110    CANDIDATE_FREQ_MAX = 110    ...    def find_candidates(self, smooth_spec: npt.NDArray[np.float64]) -> typing.List[Candidate]:        spec_slice = smooth_spec[:self.MAX_CANDIDATES]        is_max = (spec_slice[1:-1] > spec_slice[:-2]) & (spec_slice[1:-1] > spec_slice[2:])        indices = np.where(is_max)[0] + 1        freqs = (indices - 205) * self.df2        mask = (freqs >= self.CANDIDATE_FREQ_MIN) & (freqs <= self.CANDIDATE_FREQ_MAX)        cand_indices = indices[mask]        cand_freqs = freqs[mask]        snrs = [dB(ss) - WSPR_SNR_SCALING_FACTOR for ss in smooth_spec[cand_indices]]        heap = [Candidate(freq=freq, snr=snr) for freq, snr in zip(cand_freqs, snrs)]        heap.sort(key=lambda it: it.snr, reverse=True)        return heap

Функция find_candidates анализирует данные в спектре smooth_spec, находит в нем частотные бины, удовлетворяющие правилу, что соседние слева и справа от анализируемого бины имеют меньшее значение амплитуды. Массив is_max содержит логические значения, определяющие какой из бинов удовлетворяет условию. В массивы indices и freqs накапливаются индексы и значения частот бинов-кандидатов. 

Для cand_indices и cand_freqs осуществляется фильтрация, исключающая кандидатов, частоты которых сильно отклоняются от центральной; на основе данных из cand_indices cand_freqs формируется список объектов для описания кандидата для последующего декодирования. Сформированный список отсортирован по возрастанию SNR.

Так, для тестового WSPR сигнала, сформированного в рамках предыдущей части, функция find_candidates выдаст результат:

[Candidate(freq=np.float64(2.197265625), snr=np.float64(43.2654156005124), shift=0, drift=0, sync=0)].

Так как анализируемый сигнал не имеет шумов, значение SNR принимает достаточно высокое значение.

Синхронизация

После нахождения частот кандидатов осуществляется синхронизация. Для каждого кандидата выполняется грубый поиск по частоте и времени кросс-корреляции с вектором синхронизации WSPR_PR3_SIG.

Процесс синхронизации разделен на 2 стадии: грубая и точная синхронизация.

Грубая синхронизация

class WSPRMonitor(AbstractMonitor):    ...    def decode(self, **kwargs) -> typing.Iterator[WSPRLogItem]:        ...        candidates = self.find_candidates(smooth_spec)        for cand in candidates:            smax = -1e30            if0 = int(cand.freq / self.df2 + self.symbol_len)            for ifr in range(if0 - 2, if0 + 2 + 1):                for k0 in range(-10, 22):                    for drift in range(-max_drift, max_drift + 1):                        ss = 0.0                        pow = 0.0                        for k in range(WSPR_ND):                            ifd = int(ifr + (k - WSPR_NUM_BITS) / WSPR_NUM_BITS * drift / (2.0 * self.df2))                            k_idx = k0 + 2 * k                            if k_idx >= 0 and k_idx < fft_count:                                pwrs = pwr_spec[k_idx, ifd - 3: ifd + 4: 2]                                pwrs_sqrt = np.sqrt(pwrs)                                ss += WSPR_PR3_SIG[k] * pwrs_sqrt.dot([-1, 1, -1, 1])                                pow += np.sum(pwrs_sqrt)                        sync1 = ss / pow                        if sync1 > smax:                            smax = sync1                            cand.shift = 128 * (k0 + 1)                            cand.drift = drift                            cand.freq = (ifr - self.symbol_len) * self.df2                            cand.sync = sync1        ...

Получив список кандидатов, осуществляется грубый поиск сигнала по частоте (цикл ifr), времени (k0) и дрейфу (drift) относительно корреляции с вектором синхронизации WSPR_PR3_SIG. Перебор частот осуществляется в диапазоне от -2 до +2 Гц относительно центральной; диапазон перебора времени ограничен в пределах 8 символов, что составляет примерно 5.4 сек; дрейф (плавное смещение частоты тонов на всем протяжении сигнала, возникновение которого может происходить вследствие изменения свойств генератора опорной частоты передатчика во время работы, при нагреве) в пределах от -4 до +4 Гц.

Вычисление корреляции осуществляется скалярным произведением вектора синхронизации и спектра pwr_spec (ss += WSPR_PR3_SIG[k] * pwrs_sqrt.dot([-1, 1, -1, 1])) по схеме (b+d)-(a+c). Для нормализации значения корреляции значение делится на общую мощность pwrs_sqrt.

Наибольшее значение корреляции считается самым лучшим совпадением, при достижении которого происходит запись параметров shift, drift, freq и sync.

Кандидаты, у которых одинаковые значения частот и лагов, считаются одинаковыми, дубликаты удаляются.

        ...          cands = 0          for j in range(len(candidates)):              dupe = False              for k in range(cands):                  if abs(candidates[j].freq - candidates[k].freq) < 0.05 and abs(               candidates[j].shift - candidates[k].shift) < 16:                      dupe = True                      break              if dupe:                  if candidates[j].sync > candidates[k].sync:           candidates[k] = candidates[j]              elif candidates[j].sync > min_sync_2:                  candidates[cands] = candidates[j]                  cands += 1        ...

Точная синхронизация

После грубой оценки точности сигнала по корреляции с вектором синхронизации осуществляется процесс точной синхронизации с сигналом.

Для каждого из кандидатов поэтапно происходит уточнение времени, после чего уточняется частота.

...WSPR_MIN_SYNC_1 = 0.10WSPR_MIN_SYNC_2 = 0.12WSPR_ND = 162...
        ...          for cand in candidates:              f1 = cand.freq              drift1 = cand.drift              shift1 = cand.shift              f_step = 0.0              f_min = 0              f_max = 0              lag_min = shift1 - 128              lag_max = shift1 + 128              lag_step = 64              sync1, shift1, f1 = self.sync(iq_signal, f1, f_min, f_max, f_step, shift1, lag_min, lag_max, lag_step,                                            drift1, 0)              f_step = 0.25              f_min = -2              f_max = 2              sync1, shift1, f1 = self.sync(iq_signal, f1, f_min, f_max, f_step, shift1, lag_min, lag_max, lag_step,                                            drift1, 1)              if iteration < 2:                  f_step = 0.0                  f_min = 0                  f_max = 0                  driftp = drift1 + 0.5                  syncp, shift1, f1 = self.sync(iq_signal, f1, f_min, f_max, f_step, shift1, lag_min, lag_max,                                                lag_step, driftp, 1)                  driftm = drift1 - 0.5                  syncm, shift1, f1 = self.sync(iq_signal, f1, f_min, f_max, f_step, shift1, lag_min, lag_max,                                                lag_step, driftm, 1)                  if syncp > sync1:                      drift1 = driftp                      sync1 = syncp                  elif syncm > sync1:                      drift1 = driftm                      sync1 = syncm              if sync1 > WSPR_MIN_SYNC_1:                  lag_min = shift1 - 32                  lag_max = shift1 + 32                  lag_step = 16                  sync1, shift1, f1 = self.sync(iq_signal, f1, f_min, f_max, f_step, shift1, lag_min, lag_max,                                                lag_step, drift1, 0)                  f_step = 0.05                  f_min = -2                  f_max = 2                  sync1, shift1, f1 = self.sync(iq_signal, f1, f_min, f_max, f_step, shift1, lag_min, lag_max,                                                lag_step, drift1, 1)                  cand.freq = f1                  cand.shift = shift1                  cand.drift = drift1                  cand.sync = sync1        ...

Относительно начального смещения cand.shift происходит уточнение (lag_step) в диапазоне от -128 до 128 семплов с шагами в 64.

После нахождения максимального совпадения по времени осуществляется уточнение частоты сигнала (f_step) с шагами 0.25 в диапазоне -2 +2 Гц относительно ранее зафиксированной частоты cand.freq.

Уточнение дрейфа производится в диапазоне от -0.5 до +0.5 Гц.

После уточнения значения sync1 сравнивается с пороговым значением WSPR_MIN_SYNC_1, при превышении которого производится еще один этап тонкой подстройки синхронизации, в пределах от -32 до 32 семплов с шагом 16 семплов для подстройки по времени и от -2 до +2 Гц с шагом 0.05 Гц для частоты.

Точная синхронизация сигнала осуществляется с помощью функции sync, реализующего частный случай согласованного фильтра.

WSPR_TONES_COUNT = 4...class WSPRMonitor(AbstractMonitor):    ...    def sync(            self,            iq_signal: npt.NDArray[np.complex128],            f1: float,            f_min: int, f_max: int, f_step: float,            shift1: int,            lag_min: int, lag_max: int, lag_step: int,            drift1: float,            mode: int     ) -> typing.Tuple[float, int, float]:        df_offsets = np.array([-1.5, -0.5, 0.5, 1.5]) * self.df        two_pi_dt = 2 * np.pi * self.dt        sync_max = -1e30        best_shift = shift1        f_best = f1        if mode == 0:            f_min, f_max, f_step = 0, 0, 0.0        if mode == 1:            lag_min, lag_max = shift1, shift1        tone_pwr = np.zeros(WSPR_TONES_COUNT, dtype=np.float64)        for freq in range(f_min, f_max + 1):            f0 = f1 + freq * f_step            for lag in range(lag_min, lag_max + lag_step, lag_step):                ss = 0.0                total_pwr = 0.0                for sym in range(WSPR_ND):                    fp = f0 + (drift1 / 2.0) * (sym - WSPR_NUM_BITS) / WSPR_NUM_BITS                    tone_pwr.fill(0)                    freqs = fp + df_offsets                    t_indices = np.arange(self.symbol_len)                    phasors = np.exp(-1j * two_pi_dt * np.outer(freqs, t_indices))                    for tone in range(WSPR_TONES_COUNT):                        start = lag + sym * self.symbol_len                        end = start + self.symbol_len                        segment = iq_signal[max(0, start): min(len(iq_signal), end)]                        current_phasors = phasors[tone, :len(segment)]                        acc = np.dot(current_phasors, segment)                        tone_pwr[tone] = np.abs(acc)                    total_pwr += np.sum(tone_pwr)                    channel_metric = np.sum(tone_pwr[1::2]) - np.sum(tone_pwr[0::2])                    ss += channel_metric * WSPR_PR3_SIG[sym]                if total_pwr > 0:                    ss /= total_pwr                if ss > sync_max:                    sync_max = ss                    best_shift = lag                    f_best = f0        return (sync_max, best_shift, f_best)

Функция sync осуществляет перебор частот и временных лагов; для каждого из 162 символов (WSPR_ND), ожидаемых в передаче, формируется массив комплексных амплитуд phasors, являющимися эталонными (опорными) значениями FSK-тонов. Значение комплексных амплитуд (рисунок 4) — фазоров — формируются для вычисления кросс-корреляции с анализируемым сигналом в I/Q-представлении.

Далее, для каждого из 4 (WSPR_TONES_COUNT) тонов происходит вычисление корреляции принятого сигнала из segment с эталонным из фазора. Значения частот рассчитываются относительно смещения центроальной частоты на величину из df_offsets. В массив total_pwr накапливается суммарная мощность сигнала, которая далее нормируется в ss. Значение ss определяет суммарный отклик согласованного фильтра.

Рисунок 4: Представление комплексных амплитуд.

Рисунок 4: Представление комплексных амплитуд.

Результат функции — тройка значений: значение максимального отклика (sync_max) и параметры частоты (f_best) и времени (best_shift), на которых этот отклик был достигнут.

Аргумент mode определяет тип перебора параметров, 0 — только по частоте, 1 — только по времени.

Некогерентный детектор

После точной синхронизации с сигналом в полосе, в WSPR осуществляется извлечение символов. Детектор назван некогерентным, так как фаза сигнала заранее неизвестна.

Функция noncoherent_sequence_detection реализует детектор сигнала. На вход поступает I/Q-сигнал (iq_signal), значение центральной частоты сигнала (f1), параметры лага (shift1) и частотного дрейфа (drift1).

class WSPRMonitor(AbstractMonitor):    ...    def noncoherent_sequence_detection(            self,            iq_signal: npt.NDArray[np.complex128],            symbols: npt.NDArray[np.uint8],            f1: float,            shift1: int,            drift1: float,            sym_fac: int,            block_size: int,            bit_metric: bool    ):        df15 = self.df * 1.5        df05 = self.df * 0.5        ref_tones = np.zeros((WSPR_TONES_COUNT, WSPR_ND), dtype=np.complex128)        phasor_ends = np.zeros((WSPR_TONES_COUNT, WSPR_ND), dtype=np.complex128)        symb = np.zeros(WSPR_ND, dtype=np.float64)        two_pi_dt = 2 * np.pi * self.dt        f0 = f1        fp = f0 + (drift1 / 2.0) * (np.arange(WSPR_ND) - WSPR_NUM_BITS) / WSPR_NUM_BITS        d_phi = two_pi_dt * np.array([fp - df15, fp - df05, fp + df05, fp + df15])        phases = d_phi[:, :, np.newaxis] * np.arange(self.symbol_len + 1)        phasors = np.exp(1j * phases)        ...

Выделение тонов из общего спектра осуществляется через построение согласованного фильтра, который, в данном случае, описывается 3-х мерной матрицей phases, где для каждого их тонов формируются эталонные (опорные) сигналы на всей продолжительности цикла передачи; т.е. (4, 162, 257), где 257 — количество сэмплов на 1 тон.

        ...        lag = int(shift1)        indices = lag + np.arange(WSPR_ND)[:, np.newaxis] * self.symbol_len + np.arange(self.symbol_len)        indices = np.clip(indices, 0, len(iq_signal) - 1)        segments = iq_signal[indices]        segments[indices >= len(iq_signal)].fill(0)        ref_tones[:, :WSPR_ND] += np.einsum('tfj,fj->tf', np.conj(phasors[:, :, :self.symbol_len]), segments)        ...

Отклик фильтра вычисляется через суммирование матриц эталонов с сигналом. В данном случае используется многомерное суммирование через нотацию Эйнштейна — einsum, (запись tfj,fj->tf определяет умножение многомерных матриц и по каким осям оставить сумму результата. в данном случае t — ось тонов, f — ось частот, j — комплексные амплитуды сигналов) это позволяет получить результат для всех тонов сразу, в одно действие. Так как при этом осуществляется умножения на комплексно сопряженные значения (e^{j\omega t}->e^{-j\omega t}), то это можно назвать процессом гетеродинирования сигнала.

Примечание: использование многомерных матриц и их суммирование в python позволяет оптимизировать вычисления, т.к. numpy осуществляет эти операции через нативный код.

После этапа гетеродинирования тонов происходит формирование вероятностей гипотез о том какие символы находятся в сигнале.

        ...        phasor_ends[:, :WSPR_ND] = phasors[:, :, -1]        seq = 1 << block_size        seq_weights = np.zeros(seq, dtype=np.float64)        bit_mask = np.arange(seq)        bit_exp = np.arange(block_size - 1, -1, -1)        phases = np.ones(block_size, dtype=np.complex128)        for i in range(0, WSPR_ND, block_size):            time_block = np.arange(i, i + block_size)            pr3_block = WSPR_PR3[i: i + block_size]        ...

Исходный сигнал разделяется на блоки, продолжительность которых соответствует продолжительности одного символа WSPR, после чего применяет вектор синхронизации WSPR_PR3 к каждому из символов, чтобы локализовать частоты тонов, где должен располагаться символ.

Далее осуществляется перебор гипотез о том какие данные были переданы в символе. Алгоритм реализует метод максимального правдоподобия, перебирая значения бит, соответствующие возможным 4-FSK-символам.

        ...            for j in range(seq):                bits = (j >> bit_exp) & 1                tones = pr3_block + 2 * bits                phasor_proj = ref_tones[tones, time_block]                phasor_shifts = phasor_ends[tones, time_block]                phases.fill(1 + 0j)                phases[1:] = np.cumprod(phasor_shifts[:-1])        ...

Для каждой гипотезы осуществляется фазовая коррекция (phasor_ends) для когерентного суммирования общего отклика; то есть, если для гипотезы выбран подходящий символ, значение отклика усилится многократно. Коррекция фазы позволяет значительно повысить чувствительность и помехоустойчивость детектора.

        ...                res = np.sum(phasor_proj * phases)                seq_weights[j] = np.abs(res)        ...

В качестве веса гипотезы используется модуль полученной суммы.

        ...            for ib in range(block_size):                mask = (bit_mask & (1 << (block_size - 1 - ib))) == 0                xm0 = seq_weights[:seq][mask].max()                xm1 = seq_weights[:seq][~mask].max()                metric = xm1 - xm0                if bit_metric:                    metric /= np.max(xm0, xm1)                symb[i + ib] = metric        ...

Полученные гипотезы разделяются на группы нулей (xm0) и единиц (xm1), определяется разностная метрика (metric), позволяющая получить численное значение вероятности значения бит.

        ...        std = np.std(symb)        symb *= sym_fac / std        symbols[:] = np.clip(symb, -128.0, 127.0) + 128

Значения полученных вероятностей через стандартное отклонение нормализуются в диапазон -128..127 (чтобы уместить в однобайтовое целое значение), для последующего декодирования кодом коррекции ошибок.

Аргумент bit_metric функции задает нормализацию метрики, делая ее относительной, позволяя компенсировать эффект изменения амплитуды сигнала (АРУ/AGC).

Для тестовой записи из предыдущей части статьи, детектор символов возвращает результат в такой форме:

184, 71, 86, 182, 176, 167, 87, 181, 70, 85, 90, 88, 184, 70, 183, 88, 184, 87, 70, 177, 86, 70, 86, 184, 70, 71, 185, 175, 167, 87, 89, 185, 87, 177, 70, 173, 86, 69, 87, 184, 89, 90, 85, 169, 182, 86, 70, 174, 185, 71, 178, 178, 70, 70, 85, 70, 86, 174, 179, 185, 71, 86, 70, 86, 173, 178, 179, 185, 71, 87, 184, 87, 70, 179, 70, 86, 70, 179, 178, 183, 87, 88, 69, 85, 70, 185, 177, 87, 175, 185, 71, 87, 184, 87, 70, 179, 176, 88, 184, 88, 174, 181, 87, 172, 185, 179, 86, 70, 175, 177, 185, 185, 185, 179, 185, 71, 173, 88, 185, 71, 178, 185, 167, 87, 87, 70, 70, 70, 172, 87, 181, 89, 88, 184, 179, 70, 87, 176, 185, 183, 88, 90, 85, 166, 174, 177, 178, 70, 70, 85, 70, 178, 70, 70, 87, 86, 170, 71, 185, 179, 178, 91.

Декодирование кандидатов

После синхронизации и получения вероятностей символов начинается этап декодирование данных.

WSPR_SOFT_SYM_FAC = 50WSPR_MIN_RMS = 52 * (WSPR_SOFT_SYM_FAC / 64)WSPR_IIFAC = 8...class WSPRMonitor(AbstractMonitor):    ...    def decode(self, **kwargs) -> typing.Iterator[WSPRLogItem]:    ...        for cand in candidates:            f1 = cand.freq            shift1 = cand.shift            drift1 = cand.drift            decoded = None            ib = 1            while ib <= block_size and decoded is None:                if ib < 4:                    block_size = ib                    bit_metric = False                else:                    block_size = 1                    bit_metric = True                  idt = 0                  while decoded is None and idt <= (128 / WSPR_IIFAC):                      ii = (idt + 1) / 2                      if idt % 2 == 1:                          ii = -ii                      ii = WSPR_IIFAC * ii                      jittered_shift = shift1 + ii                      self.noncoherent_sequence_detection(                          iq_signal, symbols, f1, jittered_shift, drift1, WSPR_SOFT_SYM_FAC, block_size, bit_metric                     )    ...

Каждый из обнаруженных кандидатов на декодирование, проходит цикл некогерентного детектирования сигнала, при этом дополнительно происходит подбор параметров джиттера для более точного извлечения символов. Перебор параметров осуществляется итеративно, основываясь на результатах декодирования, речь о котором пойдет дальше.

Функция noncoherent_sequence_detection извлекает вероятности символов.

    ...                      rms = np.sqrt(np.mean(np.square(symbols.astype(np.float32) - 128)))                      if rms > WSPR_MIN_RMS:    ...

Для полученных символов вычисляется среднеквадратичное отклонение, если значение не превышает пороговое, то итерация подбора параметров и извлечения символов повторяется; в противном случае данные символов подвергаются дальнейшей обработке — деинтерливингу и декодированию.

Деинтерливинг

На этапе передачи, для повышения помехоустойчивости и распределения ошибок символов выполнялся процесс интерливинга, при котором происходила перестановка символов; после извлечения вероятностей символов, на принимающей стороне выполняется обратный процесс — деинтерливинг, задача которого восстановить исходную последовательность символов для извлечения данных.

class WSPRMonitor(AbstractMonitor):    ...    @staticmethod    def deinterleave(sym: npt.NDArray[np.uint8]):        tmp = np.zeros(WSPR_ND, dtype=np.uint8)        p = 0        i = 0        while p < WSPR_ND:            j = (((i * 0x80200802) & 0x0884422110) * 0x0101010101 >> 32) & 0xff            if j < WSPR_ND:                tmp[p] = sym[j]                p = p + 1            i += 1        sym[:] = tmp[:]    def decode(self, **kwargs) -> typing.Iterator[WSPRLogItem]:    ...                          self.deinterleave(symbols)    ...

Функция deinterleave реализует деинтерливинг WSPR. В WSPR в угоду скорости применяется разворот бит с использованием двоичных значений и операций над ними (Bit Reverse in 3 operations); константы 0x80200802, 0x0884422110 и 0x0101010101 подобраны так, чтобы выполнять размножение исходных бит на 64 битное слово, применять битовую маску для исключения не интересующих бит и суммирования с получением бит в обратном порядке.

В переменную j вычисляется значение индекса для перестановки, после чего происходит заполнение результирующего массива tmp согласно индексам. Полученный восстановленный массив данных копируется обратно в массив sym.

Восстановленный массив символов подвергается дальнейшему декодированию кодом коррекции ошибок.

Ранее рассмотренные символы данных после деинтерливинга принимают вид:

184, 172, 173, 87, 178, 176, 184, 174, 87, 185, 185, 70, 87, 70, 89, 185, 70, 70, 175, 86, 178, 176, 88, 71, 86, 174, 86, 70, 70, 70, 173, 184, 88, 70, 182, 175, 167, 170, 184, 71, 87, 86, 181, 179, 70, 184, 70, 178, 69, 178, 185, 90, 185, 70, 85, 86, 185, 87, 71, 179, 167, 87, 179, 184, 87, 87, 86, 70, 177, 85, 185, 183, 85, 178, 70, 185, 89, 185, 70, 70, 70, 71, 87, 178, 177, 91, 88, 87, 177, 88, 71, 179, 85, 176, 179, 90, 179, 71, 70, 185, 174, 185, 167, 184, 87, 69, 181, 70, 85, 185, 70, 88, 70, 90, 179, 86, 177, 87, 71, 87, 86, 70, 182, 89, 185, 173, 88, 177, 70, 85, 178, 71, 88, 183, 86, 169, 70, 175, 86, 87, 185, 87, 181, 70, 87, 184, 172, 184, 178, 87, 70, 71, 88, 166, 183, 174, 185, 185, 179, 179, 86, 70.

Декодер Фано

После детекции, извлечения символов и восстановления исходной последовательности с помощью деинтерливинга, данные символов подвергаются процессу декодирования кодами коррекции ошибок.

Класс кодов коррекции ошибок на основе сверточного кода (более подробно рассмотренный в предыдущей части статьи) декодируется такими алгоритмами как: алгоритм Витерби, алгоритм Елинека, алгоритм Фано и т.д..

Каждый из алгоритмов декодирования имеет свои сильные и слабые стороны, так, например, алгоритм Витерби (используемый как стандарт для решения задач декодирования сверточных кодов) перебирает все возможные пути в решетке кода, позволяет получить оптимальный результат за фиксированное время, но имеет экспоненциальный рост алгоритмической сложности при увеличении длины кодового слова. Алгоритм Фано реализует дерево решений (рисунок 5), обладает низким потреблением памяти, может работать с очень большими кодовыми словами, минимизируя вероятности ошибок, но при этом имеет переменное время работы и на зашумленных данных может уйти в бесконечный цикл. Алгоритм Елинека использует для хранения пройденных путей стек с сортировкой по метрике, работает быстрее алгоритма Фано, но требует большого количества памяти и скорость работы алгоритма также сильно зависит от зашумленности данных.

В WSPR используется алгоритм Фано, так как требует минимум ресурсов, хотя при этом, в реализации K9AN присутствует опциональная реализация алгоритма Елинека.

Рисунок 5: Движение по дереву решений алгоритма Фано.

Рисунок 5: Движение по дереву решений алгоритма Фано.
LL_POLY1 = 0xf2d05351LL_POLY2 = 0xe4613c47

Для декодирования используются те же значениям полиномов, что и при кодировании — Лейланда-Лашбо (Layland-Lushbaugh).

Функция fano реализует алгоритм декодера Фано. Фходные параметры: вероятности символов symbols, количество информационных бит в декодируемом сообщении bits, таблица метрик правдоподобия символов metric_table, шаг изменения порогового значения delta (малое значение обеспечивает высокую точность, но требует большего количества итераций), порог ограничения количества итераций max_iter.

@njitdef fano(        symbols: npt.NDArray[np.uint8],        bits: int,        metric_table: npt.NDArray[np.int64],        delta: int,        max_iter: int,        poly1: int = LL_POLY1,        poly2: int = LL_POLY2,) -> typing.Optional[typing.Tuple[int, int, npt.NDArray[np.uint8]]]:    s0 = symbols[0::2]    s1 = symbols[1::2]    metrics_all = np.empty((bits, 4), dtype=np.int64)    metrics_all[:, 0] = metric_table[0, s0] + metric_table[0, s1]    metrics_all[:, 1] = metric_table[0, s0] + metric_table[1, s1]    metrics_all[:, 2] = metric_table[1, s0] + metric_table[0, s1]    metrics_all[:, 3] = metric_table[1, s0] + metric_table[1, s1]    ...

Для всех пар символов (00, 01, 10, 11, так как скорость алгоритма ½, то есть одному входному биту данных соответствуют два выходных) рассчитываются метрики на основе логарифмических вероятностей из metric_table.

    ...    encstates = np.zeros(bits + 1, dtype=np.uint64)    gammas = np.zeros(bits + 1, dtype=np.int64)    ...

Массив encstates содержит историю состояний сдвигового регистра, которые, по сути, являются декодированными данными. Массив gammas накапливает метрику пути до текущего узла.

    ...    tms = np.zeros((bits, 2), dtype=np.int64)    current_i = np.zeros(bits, dtype=np.uint8)    node_id = 0    node_id_max = bits - 1    node_id_tail = bits - 31    lsym = 0    m0 = metrics_all[0, lsym]    m1 = metrics_all[0, 3 ^ lsym]    if m0 >= m1:        tms[0, 0] = m0        tms[0, 1] = m1    else:        tms[0, 0] = m1        tms[0, 1] = m0        encstates[0] = 1    threshold = 0    total_iters = max_iter * bits    for cycle in range(1, total_iters + 1):        current_gamma = gammas[node_id] + (tms[node_id, 0] if current_i[node_id] == 0 else tms[node_id, 1])    ...

Цикл по total_iters реализует движение по дереву декодирования. В каждом узле node_id выбирается наиболее вероятное значение бита 0 или 1 на основе состояния энкодера encstates и значений полиномов poly1, poly2.

    ...        if current_gamma >= threshold:            if gammas[node_id] < threshold + delta:                if current_gamma >= threshold + delta:                    threshold += ((current_gamma - threshold) // delta) * delta            node_id += 1            gammas[node_id] = current_gamma    ...

Пороговое значение threshold зависит от накопления метрики current_gamma, если метрика превышает порог, то процесс декодирования продолжается, но если значение метрики во много раз превышает пороговое значение, то последнее увеличивается на delta.

    ...            if node_id > node_id_max:                data = np.zeros(bits // 8, dtype=np.uint8)                for j in range(bits // 8):                    data[j] = encstates[(j + 1) * 8 - 1] & 0xff                return current_gamma, cycle, data    ...

Функция завершается успешно, если значение node_id достигло значения bits, возвращая при этом значения current_gamma, cycle и значения бит данных data на основе состояния encstates.

    ...            next_state = (encstates[node_id - 1] << 1)            encstates[node_id] = next_state            tmp = next_state & poly1            tmp ^= tmp >> 16            lsym = PARITY_TAB[(tmp ^ (tmp >> 8)) & 0xff] << 1            tmp = next_state & poly2            tmp ^= tmp >> 16            lsym |= PARITY_TAB[(tmp ^ (tmp >> 8)) & 0xff]    ...

Побитовое умножение данных с poly1 и poly2 выделяет биты, участвующие в формировании первого выходного символа. Для получения одного итогового бита выполняется сложение по модулю 2 (xor) между всеми битами tmp.

Для определения четности бит используется xor сдвиг бит.

Таблица четности PARITY_TAB позволяет быстро получить значение четности бита.

Таким образом происходит кодирование данных для проверки гипотез алгоритма при декодировании.

    ...            if node_id >= node_id_tail:                tms[node_id, 0] = metrics_all[node_id, lsym]                tms[node_id, 1] = -1000000            else:                m0 = metrics_all[node_id, lsym]                m1 = metrics_all[node_id, 3 ^ lsym]                if m0 >= m1:                    tms[node_id, 0] = m0                    tms[node_id, 1] = m1                else:                    tms[node_id, 0] = m1                    tms[node_id, 1] = m0                    encstates[node_id] |= 1            current_i[node_id] = 0            continue    ...

Хвостовые биты node_id_tail содержат нули для сброса состояния регистра, для них происходит ограничение выбора путем принудительной установки метрики на очень маленькое значение -1000000.

    ...        else:            while True:                if node_id == 0 or gammas[node_id - 1] < threshold:                    threshold -= delta                    if current_i[node_id] != 0:                        current_i[node_id] = 0                        encstates[node_id] ^= 1                    break                node_id -= 1                if node_id < node_id_tail and current_i[node_id] != 1:                    current_i[node_id] = 1                    encstates[node_id] ^= 1                    break

Если при движении вперед значение метрики current_gamma оказалось меньше порогового значения threshold, алгоритм осуществляет обратное движение по дереву, при этом происходит переход на другую ветку в текущем узле, вероятность которой была ниже предыдущей — current_i[node_id] = 1.

Если на всех узлах ветки значение метрики не достигло порогового, то происходит перемещение на один узел назад — node_id -= 1.

Если пороговое значение всё ещё слишком большое, или алгоритм вернулся в самое начало дерева, то значение порога снижается на delta и цикл повторяется с самого начала.

Таблица четности бит:

PARITY_TAB = np.array([    0, 1, 1, 0, 1, 0, 0, 1,    1, 0, 0, 1, 0, 1, 1, 0,    1, 0, 0, 1, 0, 1, 1, 0,    0, 1, 1, 0, 1, 0, 0, 1,    1, 0, 0, 1, 0, 1, 1, 0,    0, 1, 1, 0, 1, 0, 0, 1,    0, 1, 1, 0, 1, 0, 0, 1,    1, 0, 0, 1, 0, 1, 1, 0,    1, 0, 0, 1, 0, 1, 1, 0,    0, 1, 1, 0, 1, 0, 0, 1,    0, 1, 1, 0, 1, 0, 0, 1,    1, 0, 0, 1, 0, 1, 1, 0,    0, 1, 1, 0, 1, 0, 0, 1,    1, 0, 0, 1, 0, 1, 1, 0,    1, 0, 0, 1, 0, 1, 1, 0,    0, 1, 1, 0, 1, 0, 0, 1,    1, 0, 0, 1, 0, 1, 1, 0,    0, 1, 1, 0, 1, 0, 0, 1,    0, 1, 1, 0, 1, 0, 0, 1,    1, 0, 0, 1, 0, 1, 1, 0,    0, 1, 1, 0, 1, 0, 0, 1,    1, 0, 0, 1, 0, 1, 1, 0,    1, 0, 0, 1, 0, 1, 1, 0,    0, 1, 1, 0, 1, 0, 0, 1,    0, 1, 1, 0, 1, 0, 0, 1,    1, 0, 0, 1, 0, 1, 1, 0,    1, 0, 0, 1, 0, 1, 1, 0,    0, 1, 1, 0, 1, 0, 0, 1,    1, 0, 0, 1, 0, 1, 1, 0,    0, 1, 1, 0, 1, 0, 0, 1,    0, 1, 1, 0, 1, 0, 0, 1,    1, 0, 0, 1, 0, 1, 1, 0,])

Таблицы метрик, рассчитанных с помощью моделирования для 2-FSK тонов, при соотношениях Es/No=0,3,6,9 дБ и среднеквадратичном уровне шума в 50:

WSPR_METRIC_TABLES = np.array([    [0.9782, 0.9695, 0.9689, 0.9669, 0.9666, 0.9653, 0.9638, 0.9618, 0.9599, 0.9601,     0.9592, 0.9570, 0.9556, 0.9540, 0.9525, 0.9527, 0.9486, 0.9477, 0.9450, 0.9436,     0.9424, 0.9400, 0.9381, 0.9360, 0.9340, 0.9316, 0.9301, 0.9272, 0.9254, 0.9224,     0.9196, 0.9171, 0.9154, 0.9123, 0.9076, 0.9061, 0.9030, 0.9000, 0.8965, 0.8934,     0.8903, 0.8874, 0.8834, 0.8792, 0.8760, 0.8726, 0.8685, 0.8639, 0.8599, 0.8550,     0.8504, 0.8459, 0.8422, 0.8364, 0.8320, 0.8262, 0.8215, 0.8159, 0.8111, 0.8052,     0.7996, 0.7932, 0.7878, 0.7812, 0.7745, 0.7685, 0.7616, 0.7550, 0.7479, 0.7405,     0.7336, 0.7255, 0.7184, 0.7102, 0.7016, 0.6946, 0.6860, 0.6769, 0.6687, 0.6598,     0.6503, 0.6416, 0.6325, 0.6219, 0.6122, 0.6016, 0.5920, 0.5818, 0.5711, 0.5606,     0.5487, 0.5374, 0.5266, 0.5142, 0.5020, 0.4908, 0.4784, 0.4663, 0.4532, 0.4405,     0.4271, 0.4144, 0.4006, 0.3865, 0.3731, 0.3594, 0.3455, 0.3304, 0.3158, 0.3009,     0.2858, 0.2708, 0.2560, 0.2399, 0.2233, 0.2074, 0.1919, 0.1756, 0.1590, 0.1427,     0.1251, 0.1074, 0.0905, 0.0722, 0.0550, 0.0381, 0.0183, 0.0000, -0.0185, -0.0391,     -0.0571, -0.0760, -0.0966, -0.1160, -0.1370, -0.1584, -0.1787, -0.1999, -0.2214, -0.2423,    ...     -14.1653, -14.4348, -14.7983, -14.7807, -15.2349, -15.3536, -15.3026, -15.2739, -15.7170, -16.2161,     -15.9185, -15.9490, -16.6258, -16.5568, -16.4318, -16.7999, -16.4101, -17.6393, -17.7643, -17.2644,     -17.5973, -17.0403, -17.7039, -18.0073, -18.1840, -18.3848, -18.6286, -20.7063, 1.43370769e-019,     2.64031087e-006, 6.6908396e+031, 1.77537994e+028, 2.79322819e+020, 1.94326e-019,     0.00019371575, 2.80722121e-041]])WSPR_METRIC_TABLE = np.zeros((2, 256), dtype=np.int64)WSPR_METRIC_TABLE[0, :] = 10 * (WSPR_METRIC_TABLES[2] - WSPR_DECODER_BIAS)WSPR_METRIC_TABLE[1, :] = WSPR_METRIC_TABLE[0, ::-1]

Полные таблицы можно посмотреть по этой ссылке.

Функция decode передает в декодер Фано значения символов, получая при этом на выходе байты данных принятого сообщения в decoded.

WSPR_NUM_BITS = 81WSPR_FANO_THRESHOLD = 60WSPR_DECODER_LIM = 10000class WSPRMonitor(AbstractMonitor):    ...    def decode(self, **kwargs) -> typing.Iterator[WSPRLogItem]:    ...                          decoded = fano(                              symbols, WSPR_NUM_BITS, WSPR_METRIC_TABLE, WSPR_FANO_THRESHOLD, WSPR_DECODER_LIM                          )                          idt += 1                          if self.quick_mode:                              break                      ib += 1                  if decoded is not None:                      metric, cycles, dec_data = decoded                      decodes_pass += 1                      payload = dec_data.tobytes()    ...

Переменная payload содержит в себе байты WSPR сообщения.

Удаление обработанного сигнала

При многопроходном декодировании, после успешного детектирования и декодирования сообщения из сигнала, в WSPR происходит фильтрация спектра, последовательность принятых тонов подавляется в частотной области, — эта операция позволяет выделять слабые сигналы в спектре, которые изначально перекрывались сильными.

Фильтр реализуется функцией subtract_signal:

class WSPRMonitor(AbstractMonitor):    ...    def subtract_signal(self, signal: npt.NDArray[np.complex128], f0, shift0: int, drift0,                       channel_symbols: npt.NDArray[np.uint8]):        filtr = 360        samples = WSPR_ND * self.symbol_len        cs_rep = np.repeat(channel_symbols - 1.5, self.symbol_len)        ...

Массив cs_rep содержит массив тонов сдвинутых относительно центральной частоты, для соответствия тонам 4-FSK.

        ...        idx = np.repeat(np.arange(WSPR_ND), self.symbol_len)        drift_eff = (drift0 / 2.0) * (idx - WSPR_ND / 2.0) / (WSPR_ND / 2.0)        freq = f0 + drift_eff + cs_rep * self.df        phase = 2.0 * np.pi * np.cumsum(freq) * self.dt        ref = np.exp(1j * phase)        ...

В phase производится реконструкция фазы на основе частот тонов freq (относительно центральной частоты f0), массив ref содержит эталонный комплексный сигнал.

        ...        start = max(shift0, 0)        end = min(start + samples, len(signal))        segment = signal[start:end]        env = segment * np.conj(ref)        ...

Детектирование огибающей сигнала (массив env).

        ...        weights = np.sin(np.pi * np.arange(filtr) / (filtr - 1))        weights /= weights.sum()        ...

Формирование сглаживающей функции на основе синуса, для подавления высокочастотных флуктуаций, возникающих при детектировании огибающей.

        ...        env_sm = np.convolve(env, weights, mode="same")        partial_sum = np.cumsum(weights)        filtr_half = filtr // 2        corr_f = np.ones(samples)        corr_f[:filtr_half] = partial_sum[filtr_half: filtr]        corr_f[-filtr_half:] = np.flip(partial_sum[filtr_half: filtr])        ...

Свертка сигнала со сглаживающей функцией реализует фильтр нижних частот (env_sm). Для коррекции краевых эффектов после применения окна, рассчитывается корректирующий фактор corr_f на основе кумулятивной суммы весов фильтра (partial_sum). Коррекция краевых эффектов заключается в компенсации потери энергии сигнала на краях фильтра.

        ...        reconstruction = (env_sm / corr_f) * ref        signal[start:end] -= reconstruction[start:end]

В массиве reconstruction содержится эталонный сигнал, совпадающий по фазе и амплитуде с сигналами в исходном спектре. Исключение реконструированного сигнала из общего осуществляется операцией разности между signal и reconstruction.

Особенность фильтра subtract_signal заключается в применении адаптивной огибающей, осуществляющей достаточно точную реконструкцию исходного сигнала, что позволяет добиться максимально точной фильтрации сигнала.

Подсчет количества ошибок в принятом сигнале

Для оценки качества принятого сигнала в WSPR осуществляется подсчет ошибок между сырыми принятыми символами и восстановленными кодами коррекции ошибок символами.

class WSPRMonitor(AbstractMonitor):    ...    @staticmethod    def count_sym_err(symbols: npt.NDArray[np.uint8], channel_symbols: npt.NDArray[np.uint8]) -> int:        cw = (channel_symbols >= 2).astype(np.uint8)        WSPRMonitor.deinterleave(cw)        sym_bin = (symbols > 127).astype(np.uint8)        err_count = int(np.sum(sym_bin != cw))        return err_count

Функция count_sym_err принимает на вход эталонные символы symbols и сырые данные channel_symbols. Принятые из эфира символы подвергаются деинтерливингу и производится подсчет количества символов, значения которых не совпадают.

Декодирование сообщений

Финальный этап обработки сигнала в WSPR. После извлечения данных сообщения из dec_data, производится декодирование самого сообщения, вычисление метрик о качестве сигнала, фильтрация принятого сигнала и вывод результата.

class WSPRMonitor(AbstractMonitor):    ...    def decode(self, **kwargs) -> typing.Iterator[WSPRLogItem]:        ...        payload = dec_data.tobytes()        msg = WSPRMessage.decode(payload)        ...

В msg содержится объект с текстовым сообщением WSPR. Декодер сообщений был рассмотрел в первой части статьи.

        ...        dt_print = shift1 * self.dt - 1.0        freq_print = (WSPR_CENTER_FREQ + f1) / 1e6        payload = msg.encode()        tones = wspr_encode(payload)        chan_sym = np.fromiter(tones, dtype=np.uint8)        self.subtract_signal(iq_signal, f1, shift1, drift1, chan_sym)        sym_err = self.count_sym_err(symbols, chan_sym)        yield WSPRLogItem(            snr=cand.snr,            dT=dt_print,            dF=freq_print,            payload=payload,            crc=0,            BER=sym_err,            drift=drift1,            decode_pass=decodes_pass        )

Декодированное сообщение подвергается повторному внутреннему кодированию данных в WSPR сигнал, для фильтрации сигнала и подсчета количества ошибок в принятых символах.

WSPRLogItem содержит данные сообщения и оценки качества сигнала:

@dataclassclass LogItem:   snr: float   dT: float   dF: float   payload: typing.ByteString   crc: int@dataclassclass WSPRLogItem(LogItem):   BER: int   drift: float   decode_pass: int

Полный пример декодера с использованием класса WSPRMonitor:

import timefrom scipy.io.wavfile import readfrom decoders.wspr import WSPRMonitorfrom msg.message import WSPRMsgServerdef main():    sample_rate, data = read("examples/signal.wav")    msg_svr = WSPRMsgServer()    mon = WSPRMonitor(        sample_rate=sample_rate    )    mon.monitor_process(data)    for it in mon.decode():        msg = msg_svr.decode(it.payload)        print(            f"dB: {it.snr:.3f}\t"            f"T: {it.dT:.3f}\t"            f"DF: {it.dF:.3f}\t"            f"BER: {it.BER}\t"            f"drift: {it.drift}\t"            f"pass: {it.decode_pass}\t"            f"Message text: {msg}"        )if __name__ == '__main__':    main()

Пример декодированного сообщения WSPR:

dB: 43.265T: -1.000DF: 0.002BER: 0drift: 0pass: 1Message text: R9FEU LO87 50

Заключение

В завершающей цикл статье, посвященной протоколу WSPR были рассмотрены механизмы и принципы цифровой обработки сигналов, перенос спектра, детектирование и выделение слабых сигналов согласованными фильтрами при очень низких значениях SNR, механизмы коррекции ошибок на основе сверточных кодов.

Протокол WSPR может представлять интерес с точки зрения реализации протоколов связи, способных обеспечивать передачу данных при малых мощностях передатчиков, при очень низких значениях SNR, при этом обладая устойчивостью к помехам. Отдельного внимания заслуживают алгоритмы цифровой обработки сигналов и математические принципы, лежащие в их основе.

От автора статьи:

Реализация WSPR декодера на Python позволила во многом ощутить возможности библиотеки numpy, пересмотреть восприятие сигналов, так как вся обработка сигналов осуществляется в I/Q-форме, с использованием комплексных экспонент, что потребовало преодоления некоего порога вхождения.

К сожалению, реализация на Python проигрывает по скорости декодирования с реализацией на Си, хоть и были использованы возможности библиотеки numpy и jit-компилятора, позволяющие получить значительное ускорение вычислений.

По традиции, начавшейся с Q65, автор выражает благодарность всем читателям данного цикла статей и лицам, заинтересованных в устройстве протокола WSPR.

Ссылки

ссылка на оригинал статьи https://habr.com/ru/articles/1027722/