Добрый день, коллеги!
Я инженер по энергетике и автоматизации. Имею приличный опыт работы с инженерными системами жизнеобеспечения. Не так давно появилась идея сделать свою SCADA-систему для диспетчеризации автономных инженерных систем.
Потихоньку начал изучать материал и в итоге дошёл до идеи написания вычислительного ядра. Оно довольно простое, но при этом, легко масштабируемое и предсказуемое. Системы, которыми предполагается управлять, относятся к объектам критической инфраструктуры, поэтому вопрос архитектуры, отказоустойчивости и предсказуемости поведения для меня был принципиален с самого начала.
В какой-то момент руководство сменилось, цели сменились, планы изменились, и в целом от этой идеи ушли. Но писать архитектуру ядра я продолжил, потому что сам её и разрабатывал, и мне в целом интересно довести её до внятного состояния.
Вы можете спросить, а почему не MasterSCADA или другие подобные системы, с которыми я тоже работал? Потому что, когда ты работаешь с готовым продуктом, ты не знаешь, что у него внутри. Ты можешь только догадываться. А здесь мне интересно именно понимать, как всё устроено на уровне ядра, потоков, обмена данными и логики вызовов.
У меня там ещё масса нерешённых вопросов, один из основных — многопоточность, отказоустойчивость и прерывние.
Поток опроса ПЛК
Ниже привожу код реализации отдельного потока опроса ПЛК (на стенде это TCP-client) PlcWorker, который циклически опрашивает Modbus-регистры и складывает актуальные значения в кэш вместе с временной меткой.
struct DataPoint{ double value = 0.0; std::chrono::system_clock::time_point timestamp;};struct ModbusReadPoint{ std::string key; int slaveId; ModbusRegisterType regType; int startAddress; int count = 1;};class PlcWorker{public: PlcWorker(ModbusClient& client, std::vector<ModbusReadPoint> readPoints); ~PlcWorker() noexcept; void start(); void stop(); std::optional<DataPoint> getDataPoint(const std::string& key) const;private: void process(); void readCycle(); ModbusClient& client_; std::thread worker_; mutable std::mutex mtx_; std::atomic_bool running_ {false}; std::condition_variable cv_; std::unordered_map<std::string, DataPoint> data_; std::vector<ModbusReadPoint> readPoints_; std::chrono::milliseconds pollInterval_ {200};};
PlcWorker::PlcWorker(ModbusClient &client, std::vector<ModbusReadPoint> readPoints) : client_(client), readPoints_(std::move(readPoints)){}PlcWorker::~PlcWorker() noexcept{ stop();}void PlcWorker::start(){ if(running_) { return; } running_ = true; worker_ = std::thread(&PlcWorker::process, this);}void PlcWorker::stop(){ if(!running_){ return; } running_ = false; cv_.notify_all(); if(worker_.joinable()){ worker_.join(); }}std::optional<DataPoint> PlcWorker::getDataPoint(const std::string &key) const{ std::lock_guard<std::mutex> lock(mtx_); auto it = data_.find(key); if(it == data_.end()){ return std::nullopt; } return it->second; }void PlcWorker::process(){ while(running_){ readCycle(); std::unique_lock<std::mutex> lock(mtx_); cv_.wait_for(lock, pollInterval_, [this](){ return !running_; }); }}void PlcWorker::readCycle(){ for(const auto& it : readPoints_){ if(it.count != 1){ continue; } auto reg = it.regType; int address = it.startAddress; double value {0.0}; bool success = false; if(reg == ModbusRegisterType::Coil){ value = client_.readCoil(address); success = true; }else if(reg == ModbusRegisterType::DiscreteInput){ value = client_.readDiscrete(address); success = true; }else if(reg == ModbusRegisterType::InputRegister){ value = client_.readInput(address); success = true; }else if(reg == ModbusRegisterType::HoldingRegister){ value = client_.readHolding(address); success = true; } if(!success){ continue; } std::lock_guard<std::mutex> lock(mtx_); DataPoint data; data.timestamp = std::chrono::system_clock::now(); data.value = value; data_[it.key] = data; }}
На текущем этапе это отдельный поток для последовательного опроса устройств по протоколу Modbus
Логика пока простая:
-
есть перечень регистров, которые реально задействованы;
-
они собираются заранее;
-
далее идёт циклический опрос;
-
после каждого опроса значение кладётся в кэш вместе с меткой времени.
То есть по факту имеем последовательный опрос только нужных регистров, без чтения всех регистров подряд. Кстати ранее имел опыт поиска в оперативной памяти прибора ТЭМ-104 показаний real-time, так как в новой прошивке они просто перехали в другое место, и в итоге нашел новые адреса регистров.
Отдельная логика для команд управления
Для передачи команд на управление механизмами у меня реализована другая логика, и это будет другой поток, который должен иметь приоритет над потоком чтения данных.
То есть схема примерно такая:
-
один поток занимается опросом и обновлением кэша значений;
-
другой поток занимается командами управления исполнительными механизмами;
-
при этом запись команды не должна ломать опрос, но должна выполняться предсказуемо и без конфликтов.
Сейчас для Modbus я пришёл к тому, что доступ к ПЛК/клиенту нужно сериализовать через mutex, чтобы ПЛК выполнял для меня одну задачу. То есть фактически чтение и запись не выполняются по-настоящему параллельно через один ModbusClient, а просто попадают в очередь доступа через mutex. Для одного канала связи это логично (RS-485).
По поводу std::condition_variable
До конца, честно говоря, ещё не разобрался в глубинном смысле работы std::condition_variable.
То есть базовый эффект я понимаю: поток не крутится в холостую, и не тратит ресурсы, но если смотреть глубже, с точки зрения именно SCADA/диспетчерских систем, где важны:
-
предсказуемость,
-
скорость реакции,
-
точность цикла опроса,
-
корректная работа под нагрузкой,
то хотелось бы лучше понять, насколько такой подход действительно правильный…
Мой принцип в этом плане простой: пишу только так, чтобы сам полностью понимал, что происходит. И только после этого можно усложнять и оптимизировать.
Вопрос по масштабированию
Сейчас ядро растёт, и уже очевидно, что последовательный опрос начинает упираться в скорость, особенно если говорить про 1000+ датчиков. А такое количество сигналов вполне может быть уже на старте. И тут речь не только о ПЛК. В дальнейшем большая часть точек будет идти не только через ПЛК, а через: приборы учёта, преобразователи интерфейсов, прямой вызов через различные мосты, отдельные устройства технического учёта энергоносителей. Сейчас это только первый слой, а дальше архитектура будет усложняться.
Что хотелось бы обсудить с коллегами
Интересно услышать мнение тех, кто реально сталкивался с похожими системами реального времени, где важны: скорость, точность цикла, предсказуемость, отказоустойчивость.
Особенно интересны такие вопросы:
-
Как вы делите потоки в подобных системах?
-
Делаете ли отдельные потоки по устройствам / по шинам / по группам сигналов?
-
Как организуете приоритет команд управления над потоком чтения?
-
Насколько оправдано использование
std::condition_variable? -
Как масштабируете опрос при 1000+ точек?
-
Правильно ли сериализовать доступ к одному
ModbusClient (ПЛК)черезmutex, если чтение и запись идут из разных потоков?
Буду рад конструктивной критике и реальным инженерным кейсам. Спасибо!
ссылка на оригинал статьи https://habr.com/ru/articles/1029582/