- PVSM.RU - https://www.pvsm.ru -

Многопоточность в SCADA системах

Добрый день, коллеги!

Я инженер по энергетике и автоматизации. Имею приличный опыт работы с инженерными системами жизнеобеспечения. Не так давно появилась идея сделать свою SCADA-систему для диспетчеризации автономных инженерных систем.

Потихоньку начал изучать материал и в итоге дошёл до идеи написания вычислительного ядра. Оно довольно простое, но при этом, легко масштабируемое и предсказуемое. Системы, которыми предполагается управлять, относятся к объектам критической инфраструктуры, поэтому вопрос архитектуры, отказоустойчивости и предсказуемости поведения для меня был принципиален с самого начала.

В какой-то момент руководство сменилось, цели сменились, планы изменились, и в целом от этой идеи ушли. Но писать архитектуру ядра я продолжил, потому что сам её и разрабатывал, и мне в целом интересно довести её до внятного состояния.

Вы можете спросить, а почему не MasterSCADA или другие подобные системы, с которыми я тоже работал? Потому что, когда ты работаешь с готовым продуктом, ты не знаешь, что у него внутри. Ты можешь только догадываться. А здесь мне интересно именно понимать, как всё устроено на уровне ядра, потоков, обмена данными и логики вызовов.

Исходники ядра лежат здесь [1]

У меня там ещё масса нерешённых вопросов, один из основных - многопоточность, отказоустойчивость и прерывние.

Поток опроса ПЛК

Ниже привожу код реализации отдельного потока опроса ПЛК (на стенде это TCP-client) PlcWorker, который циклически опрашивает Modbus-регистры и складывает актуальные значения в кэш вместе с временной меткой.

struct DataPoint
{
    double value = 0.0;
    std::chrono::system_clock::time_point timestamp;
};

struct ModbusReadPoint
{
    std::string key;
    int slaveId;
    ModbusRegisterType regType;
    int startAddress;
    int count = 1;
};

class PlcWorker
{
public:
    PlcWorker(ModbusClient& client, 
        std::vector<ModbusReadPoint> readPoints);

    ~PlcWorker() noexcept;

    void start();
    void stop();
    std::optional<DataPoint> getDataPoint(const std::string& key) const;

private:
    void process();
    void readCycle();

    ModbusClient& client_;
    std::thread worker_;
    mutable std::mutex mtx_;
    std::atomic_bool running_ {false};
    std::condition_variable cv_;
    std::unordered_map<std::string, DataPoint> data_;
    std::vector<ModbusReadPoint> readPoints_;
    std::chrono::milliseconds pollInterval_ {200};
};
PlcWorker::PlcWorker(ModbusClient &client, 
    std::vector<ModbusReadPoint> readPoints)
    : client_(client), 
    readPoints_(std::move(readPoints))
{}

PlcWorker::~PlcWorker() noexcept
{
    stop();
}

void PlcWorker::start()
{
    if(running_) {
        return;
    }
    running_ = true;
    worker_ = std::thread(&PlcWorker::process, this);
}

void PlcWorker::stop()
{
    if(!running_){
        return;
    }
    running_ = false;
    cv_.notify_all();

    if(worker_.joinable()){
        worker_.join();
    }
}

std::optional<DataPoint> PlcWorker::getDataPoint(const std::string &key) const
{
    std::lock_guard<std::mutex> lock(mtx_);

    auto it = data_.find(key);

    if(it == data_.end()){
        return std::nullopt; 
    }
        
    return it->second;    
}


void PlcWorker::process()
{
    while(running_){

        readCycle();

        std::unique_lock<std::mutex> lock(mtx_);

        cv_.wait_for(lock, pollInterval_, [this](){
            return !running_;
        });

    }
}

void PlcWorker::readCycle()
{
    for(const auto& it : readPoints_){

        if(it.count != 1){
            continue;
        }

        auto reg = it.regType;
        int address = it.startAddress;
        double value {0.0};
        bool success = false;

        if(reg == ModbusRegisterType::Coil){
            value = client_.readCoil(address);
            success = true;
        }else if(reg == ModbusRegisterType::DiscreteInput){
            value = client_.readDiscrete(address);
            success = true;
        }else if(reg == ModbusRegisterType::InputRegister){
            value = client_.readInput(address);
            success = true;
        }else if(reg == ModbusRegisterType::HoldingRegister){
            value = client_.readHolding(address);
            success = true;
        }
        
        if(!success){
            continue;
        }

        std::lock_guard<std::mutex> lock(mtx_);

        DataPoint data;
        data.timestamp = std::chrono::system_clock::now();
        data.value = value;
        data_[it.key] = data;
    }
}

На текущем этапе это отдельный поток для последовательного опроса устройств по протоколу Modbus
Логика пока простая:

  • есть перечень регистров, которые реально задействованы;

  • они собираются заранее;

  • далее идёт циклический опрос;

  • после каждого опроса значение кладётся в кэш вместе с меткой времени.

То есть по факту имеем последовательный опрос только нужных регистров, без чтения всех регистров подряд. Кстати ранее имел опыт поиска в оперативной памяти прибора ТЭМ-104 показаний real-time, так как в новой прошивке они просто перехали в другое место, и в итоге нашел новые адреса регистров.

Отдельная логика для команд управления

Для передачи команд на управление механизмами у меня реализована другая логика, и это будет другой поток, который должен иметь приоритет над потоком чтения данных.

То есть схема примерно такая:

  • один поток занимается опросом и обновлением кэша значений;

  • другой поток занимается командами управления исполнительными механизмами;

  • при этом запись команды не должна ломать опрос, но должна выполняться предсказуемо и без конфликтов.

Сейчас для Modbus я пришёл к тому, что доступ к ПЛК/клиенту нужно сериализовать через mutex, чтобы ПЛК выполнял для меня одну задачу. То есть фактически чтение и запись не выполняются по-настоящему параллельно через один ModbusClient, а просто попадают в очередь доступа через mutex. Для одного канала связи это логично (RS-485).

По поводу std::condition_variable

До конца, честно говоря, ещё не разобрался в глубинном смысле работы std::condition_variable.

То есть базовый эффект я понимаю: поток не крутится в холостую, и не тратит ресурсы, но если смотреть глубже, с точки зрения именно SCADA/диспетчерских систем, где важны:

  • предсказуемость,

  • скорость реакции,

  • точность цикла опроса,

  • корректная работа под нагрузкой,

то хотелось бы лучше понять, насколько такой подход действительно правильный...

Мой принцип в этом плане простой: пишу только так, чтобы сам полностью понимал, что происходит. И только после этого можно усложнять и оптимизировать.

Вопрос по масштабированию

Сейчас ядро растёт, и уже очевидно, что последовательный опрос начинает упираться в скорость, особенно если говорить про 1000+ датчиков. А такое количество сигналов вполне может быть уже на старте. И тут речь не только о ПЛК. В дальнейшем большая часть точек будет идти не только через ПЛК, а через: приборы учёта, преобразователи интерфейсов, прямой вызов через различные мосты, отдельные устройства технического учёта энергоносителей. Сейчас это только первый слой, а дальше архитектура будет усложняться.

Что хотелось бы обсудить с коллегами

Интересно услышать мнение тех, кто реально сталкивался с похожими системами реального времени, где важны: скорость, точность цикла, предсказуемость, отказоустойчивость.

Особенно интересны такие вопросы:

  • Как вы делите потоки в подобных системах?

  • Делаете ли отдельные потоки по устройствам / по шинам / по группам сигналов?

  • Как организуете приоритет команд управления над потоком чтения?

  • Насколько оправдано использование std::condition_variable ?

  • Как масштабируете опрос при 1000+ точек?

  • Правильно ли сериализовать доступ к одному ModbusClient (ПЛК) через mutex, если чтение и запись идут из разных потоков?

Буду рад конструктивной критике и реальным инженерным кейсам. Спасибо!

Автор: maksys2011

Источник [2]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/avtomatizatsiya/450645

Ссылки в тексте:

[1] Исходники ядра лежат здесь: https://github.com/maksys2011/home-scada-core

[2] Источник: https://habr.com/ru/articles/1029582/?utm_source=habrahabr&utm_medium=rss&utm_campaign=1029582