Многопоточность в SCADA системах

от автора

Добрый день, коллеги!

Я инженер по энергетике и автоматизации. Имею приличный опыт работы с инженерными системами жизнеобеспечения. Не так давно появилась идея сделать свою SCADA-систему для диспетчеризации автономных инженерных систем.

Потихоньку начал изучать материал и в итоге дошёл до идеи написания вычислительного ядра. Оно довольно простое, но при этом, легко масштабируемое и предсказуемое. Системы, которыми предполагается управлять, относятся к объектам критической инфраструктуры, поэтому вопрос архитектуры, отказоустойчивости и предсказуемости поведения для меня был принципиален с самого начала.

В какой-то момент руководство сменилось, цели сменились, планы изменились, и в целом от этой идеи ушли. Но писать архитектуру ядра я продолжил, потому что сам её и разрабатывал, и мне в целом интересно довести её до внятного состояния.

Вы можете спросить, а почему не MasterSCADA или другие подобные системы, с которыми я тоже работал? Потому что, когда ты работаешь с готовым продуктом, ты не знаешь, что у него внутри. Ты можешь только догадываться. А здесь мне интересно именно понимать, как всё устроено на уровне ядра, потоков, обмена данными и логики вызовов.

Исходники ядра лежат здесь

У меня там ещё масса нерешённых вопросов, один из основных — многопоточность, отказоустойчивость и прерывние.

Поток опроса ПЛК

Ниже привожу код реализации отдельного потока опроса ПЛК (на стенде это TCP-client) PlcWorker, который циклически опрашивает Modbus-регистры и складывает актуальные значения в кэш вместе с временной меткой.

struct DataPoint{    double value = 0.0;    std::chrono::system_clock::time_point timestamp;};struct ModbusReadPoint{    std::string key;    int slaveId;    ModbusRegisterType regType;    int startAddress;    int count = 1;};class PlcWorker{public:    PlcWorker(ModbusClient& client,         std::vector<ModbusReadPoint> readPoints);    ~PlcWorker() noexcept;    void start();    void stop();    std::optional<DataPoint> getDataPoint(const std::string& key) const;private:    void process();    void readCycle();    ModbusClient& client_;    std::thread worker_;    mutable std::mutex mtx_;    std::atomic_bool running_ {false};    std::condition_variable cv_;    std::unordered_map<std::string, DataPoint> data_;    std::vector<ModbusReadPoint> readPoints_;    std::chrono::milliseconds pollInterval_ {200};};
PlcWorker::PlcWorker(ModbusClient &client,     std::vector<ModbusReadPoint> readPoints)    : client_(client),     readPoints_(std::move(readPoints)){}PlcWorker::~PlcWorker() noexcept{    stop();}void PlcWorker::start(){    if(running_) {        return;    }    running_ = true;    worker_ = std::thread(&PlcWorker::process, this);}void PlcWorker::stop(){    if(!running_){        return;    }    running_ = false;    cv_.notify_all();    if(worker_.joinable()){        worker_.join();    }}std::optional<DataPoint> PlcWorker::getDataPoint(const std::string &key) const{    std::lock_guard<std::mutex> lock(mtx_);    auto it = data_.find(key);    if(it == data_.end()){        return std::nullopt;     }            return it->second;    }void PlcWorker::process(){    while(running_){        readCycle();        std::unique_lock<std::mutex> lock(mtx_);        cv_.wait_for(lock, pollInterval_, [this](){            return !running_;        });    }}void PlcWorker::readCycle(){    for(const auto& it : readPoints_){        if(it.count != 1){            continue;        }        auto reg = it.regType;        int address = it.startAddress;        double value {0.0};        bool success = false;        if(reg == ModbusRegisterType::Coil){            value = client_.readCoil(address);            success = true;        }else if(reg == ModbusRegisterType::DiscreteInput){            value = client_.readDiscrete(address);            success = true;        }else if(reg == ModbusRegisterType::InputRegister){            value = client_.readInput(address);            success = true;        }else if(reg == ModbusRegisterType::HoldingRegister){            value = client_.readHolding(address);            success = true;        }                if(!success){            continue;        }        std::lock_guard<std::mutex> lock(mtx_);        DataPoint data;        data.timestamp = std::chrono::system_clock::now();        data.value = value;        data_[it.key] = data;    }}

На текущем этапе это отдельный поток для последовательного опроса устройств по протоколу Modbus
Логика пока простая:

  • есть перечень регистров, которые реально задействованы;

  • они собираются заранее;

  • далее идёт циклический опрос;

  • после каждого опроса значение кладётся в кэш вместе с меткой времени.

То есть по факту имеем последовательный опрос только нужных регистров, без чтения всех регистров подряд. Кстати ранее имел опыт поиска в оперативной памяти прибора ТЭМ-104 показаний real-time, так как в новой прошивке они просто перехали в другое место, и в итоге нашел новые адреса регистров.

Отдельная логика для команд управления

Для передачи команд на управление механизмами у меня реализована другая логика, и это будет другой поток, который должен иметь приоритет над потоком чтения данных.

То есть схема примерно такая:

  • один поток занимается опросом и обновлением кэша значений;

  • другой поток занимается командами управления исполнительными механизмами;

  • при этом запись команды не должна ломать опрос, но должна выполняться предсказуемо и без конфликтов.

Сейчас для Modbus я пришёл к тому, что доступ к ПЛК/клиенту нужно сериализовать через mutex, чтобы ПЛК выполнял для меня одну задачу. То есть фактически чтение и запись не выполняются по-настоящему параллельно через один ModbusClient, а просто попадают в очередь доступа через mutex. Для одного канала связи это логично (RS-485).

По поводу std::condition_variable

До конца, честно говоря, ещё не разобрался в глубинном смысле работы std::condition_variable.

То есть базовый эффект я понимаю: поток не крутится в холостую, и не тратит ресурсы, но если смотреть глубже, с точки зрения именно SCADA/диспетчерских систем, где важны:

  • предсказуемость,

  • скорость реакции,

  • точность цикла опроса,

  • корректная работа под нагрузкой,

то хотелось бы лучше понять, насколько такой подход действительно правильный…

Мой принцип в этом плане простой: пишу только так, чтобы сам полностью понимал, что происходит. И только после этого можно усложнять и оптимизировать.

Вопрос по масштабированию

Сейчас ядро растёт, и уже очевидно, что последовательный опрос начинает упираться в скорость, особенно если говорить про 1000+ датчиков. А такое количество сигналов вполне может быть уже на старте. И тут речь не только о ПЛК. В дальнейшем большая часть точек будет идти не только через ПЛК, а через: приборы учёта, преобразователи интерфейсов, прямой вызов через различные мосты, отдельные устройства технического учёта энергоносителей. Сейчас это только первый слой, а дальше архитектура будет усложняться.

Что хотелось бы обсудить с коллегами

Интересно услышать мнение тех, кто реально сталкивался с похожими системами реального времени, где важны: скорость, точность цикла, предсказуемость, отказоустойчивость.

Особенно интересны такие вопросы:

  • Как вы делите потоки в подобных системах?

  • Делаете ли отдельные потоки по устройствам / по шинам / по группам сигналов?

  • Как организуете приоритет команд управления над потоком чтения?

  • Насколько оправдано использование std::condition_variable ?

  • Как масштабируете опрос при 1000+ точек?

  • Правильно ли сериализовать доступ к одному ModbusClient (ПЛК) через mutex, если чтение и запись идут из разных потоков?

Буду рад конструктивной критике и реальным инженерным кейсам. Спасибо!

ссылка на оригинал статьи https://habr.com/ru/articles/1029582/