- PVSM.RU - https://www.pvsm.ru -

Не бойтесь велосипедов. Или еще один Grand Central Dispatch (GCD) на C++11

ИМХО (Имею Мнение Хрен Оспоришь)

С моей точки зрения самое полезное, что может сделать программист для повышения своего профессионального уровня — это написание велосипедов. Велосипедостроение — очень увлекательный процесс. Иногда он увлекает больше, чем задача, ради которой сам велосипед и затевался. При написании велосипеда (под велосипедом я понимаю реализацию уже существующего) происходит более глубокое понимание уже существующих решений и техник.
Не бойтесь велосипедов. Или еще один Grand Central Dispatch (GCD) на C++11

Мотивация

Вот уже больше трех лет моим основным рабочим языком является objective-c, и когда я только начал писать на нем меня приятно удивил продуманный высокоуровневый API для работы с многопоточностью NSOperationQueue [1], а позже — GCD [2], который по моему мнению является квинтэссенцией лаконичности и понятности для Thread concurrency. И вот недавние статьи на Хабре: Техника написания аналога await/async из C# для C++ [3] и Thread concurrency C++11 [4]. Они и заставили посмотреть на те новые плюшки, которые предоставляет C++ для работы с многопоточностью. И большинство из них (тот-же std::future) для меня выглядят примерно так:
Не бойтесь велосипедов. Или еще один Grand Central Dispatch (GCD) на C++11

Домыслы и хотелки

Вот типичный сценарий, в котором я использую многопоточность в своих приложениях:

  • асинхронно получить какие-то данные (файлы/сеть);
  • распарсить/подотовить полученные данные;
  • вернуть данные в вызываемый поток (например, в главный поток и обновить UI).

Удобно, чтобы для каждой из этих операций была своя очередь.
А еще более удобно, когда все это собрано в одном месте, а не разбросано по пяти исходным файлам. Что-то на подобии:

file_io_queue.async([=]{
    file_data = get_data_from_file( file_name );
    parser_queue.async([=]{
        parsed_data = parse( file_data );
        main_queue.async([=]{
            update_ui_with_new_data( parsed_data ) ;
        });
    });
});

Этот код читается как абсолютно линейный, синхронный код. Он описывает логику того, как будут происходить изменения в данных. Для меня, по большому счету, без разницы, в каком потоке будет выполняться вычитка файла, в каком — его парсинг. Главное — последовательность этих операций. Я могу вызвать предыдущий код 100500 раз для 100500 файлов.

Очевидное решение — реализация шаблона пулл потоков. Но почти все реализации, виденные мной на просторах интернетиков, предлагают использовать один std::thread для одной очереди. С моей точки зрения это не есть хорошо. Например, нужно хранить инстанс самой очереди все время, пока выполняются асинхронные операции. Создание исттанса std::thread на порядки более затратная операция, чем захват/освобождение мютекса. Когда нам стоит уничтожать очередь? Да и простаивающее большое количество потоков в то время, когда очередь не используется — не айс.
Мы сделаем по другому. У нас будет N-ное количество потоков (std::thread) и список легковесных очередей с приоритетами. Когда мы добавляем задачу в очередь, то оповещаем поток о том, что появилась новая задача. Поток берет самую высокоприоритетную задачу и выполняет ее. Если задача с таким приоритетом уже выполняется, то берет более низкоприоритетную задачу. Если таких нет — ждет.
Не бойтесь велосипедов. Или еще один Grand Central Dispatch (GCD) на C++11

Код

Приступим:

Очередь

namespace dispatch{
    typedef std::function<void ()> function;
    struct queue {
        typedef long priority; // Наш приоритет. Пусть это будет целое число
        const queue::priority queue_priority; 

        static std::shared_ptr<queue> main_queue() ; // Об этом ниже

        virtual void async(function) const; // Собственно метод для добавления задачи в очередь

        queue(queue::priority priority) : queue_priority(priority) {};
    };
}

Реализация метода async

просто перенаправляет вызов в thread pool:

    void queue::async(dispatch::function task) const {
        thread_pool::shared_pool()->push_task_with_priority(task, this->queue_priority);
    };

Вся работа будет происходить в нашем

Пуле потоков:

    struct queue_impl{
        const queue::priority priority;
        std::queue<function> tasks;
        bool is_running;
        queue_impl(queue::priority priority): priority(priority){};
    };
    
    struct thread_pool{
        thread_pool();
        static std::shared_ptr<thread_pool>& shared_pool(); // thread_pool
        virtual ~thread_pool();
        
        bool stop;
        
        typedef std::shared_ptr<queue_impl> queue_ptr; 
        
        void push_task_with_priority(const function&, queue::priority);// Добавляем задачу в очередь с приоритетом
        bool get_free_queue(queue_ptr*) const;          // Ищем очередь, задачи из которой не исполняются прямо сейчас
        void start_task_in_queue(const queue_ptr&);  // Отмечаем очередь как исполняющуюся
        void stop_task_in_queue(const queue_ptr&);  // Снимаем отметку
        
        std::mutex mutex; // Мютекс для синхронизации пула
        std::map<queue::priority, queue_ptr> queues; // Здесь хранятся очереди по приоритетам
        
        std::mutex main_thread_mutex;
        std::queue<dispatch::function> main_queue;
        
        std::condition_variable condition;
        std::vector<std::thread> threads; // Массив реальных потоков, которые и будут выполнять задачи
        
        dispatch::function main_loop_need_update;
        void add_worker(); // Добавляем еще один поток
    };

Рассмотрим методы по порядку. Нам необходимо найти свободную очередь с максимальным приоритетом:

найти свободную очередь с максимальным приоритетом:

   bool thread_pool::get_free_queue(queue_ptr* out_queue) const {
        //  проходим по всем очередям с приоритетом от максимального до минимального
       auto finded = std::find_if(queues.rbegin(), queues.rend(), [](const std::pair<queue::priority, queue_ptr>& iter){
                                         return ! iter.second->is_running; // и находим первую свободную очередь
                                     });
        
        bool is_free_queue_exist = (finded != queues.rend());
        if (is_free_queue_exist)
            *out_queue = finded->second;
        
        return  is_free_queue_exist;
    }

Добавляем задачу в очередь

    void thread_pool::push_task_with_priority(const function& task, queue::priority priority){
        {
            std::unique_lock<std::mutex> lock(mutex); // Захватваем мютекс

            // Добавляем задачу в очередь. Если очереди нет - создаем ее
            auto queue = queues[priority];
            if (!queue){
                queue = std::make_shared<dispatch::queue_impl>(priority);
                queues[priority] = queue;
            }
            queue->tasks.push(task);

            // Если необходимо, то добавляем потоки
            unsigned max_number_of_threads = std::max<unsigned>(std::thread::hardware_concurrency(), 2);
            unsigned number_of_threads_required = round(log(queues.size()) + 1);
            while (threads.size() < std::min<unsigned>(max_number_of_threads, number_of_threads_required)){
                add_worker();
            }
        }
        condition.notify_one(); // Оповещаем поток, что мы добавили задачу в очередь
    }

Отмечаем задачу как выполненную

    void thread_pool::stop_task_in_queue(const queue_ptr& queue){
        {
            std::unique_lock<std::mutex> lock(mutex);
           // Отмечаем задачу как выполненную. Если очередь пуста - убираем ее из списка очередей
            queue->is_running = false;
            if ( queue->tasks.size() ==0 ){
                queues.erase(queues.find(queue->queue_priority));
            }
        }
        condition.notify_one(); // Оповещаем поток, что одна из задач выполненна
    }

И, собственно, сам поток:

    void thread_pool::add_worker(){
        threads.push_back(std::thread([=]{
                              dispatch::function task;
                              thread_pool::queue_ptr queue;
                              while(true){
                                  {
                                      std::unique_lock<std::mutex> lock(mutex); // Пытаемся захватить мютекс
                                      
                                      while(!stop && !get_free_queue(&queue)) // Если нет свободных очередей
                                          condition.wait(lock);                   // То ждем оповещения
                                   
                                      if(stop) // Если пулл потоков был остановлен, то завершаемся
                                          return;

                                      task = queue->tasks.front(); // Забираем задачу из очереди
                                      queue->tasks.pop(); 

                                      start_task_in_queue(queue); // Отмечаем очередь как занятую
                                  }
                                  task(); // Исполняем задачу
                                  stop_task_in_queue(queue); // Отмечаем очередь как свободную
                              }
                          }));
    }

Main Thread и Run Loop

Не бойтесь велосипедов. Или еще один Grand Central Dispatch (GCD) на C++11

В С++ нет такого понятия как главный поток. Но на этой концепции построены практически все UI приложения. UI мы можем изменять только из главного потока. Значит, нам нужно либо самим организовать Run Loop, либо вклиниться в уже существующий.

Для начала создадим отдельную очередь для «главного потока»:

Main Queue

    struct main_queue : queue{
        virtual void async(dispatch::function task) const override;
        main_queue(): queue(0) {};
    };

    std::shared_ptr<queue> queue::main_queue(){
        return std::static_pointer_cast<dispatch::queue>(std::make_shared<dispatch::main_queue>());
    }

А в методе async будем добавлять задачи в

отдельную очередь

    void main_queue::async(dispatch::function task) const {
        auto pool = thread_pool::shared_pool();
        std::unique_lock<std::mutex> lock(pool->main_thread_mutex);
        pool->main_queue.push(task);
        if (pool->main_loop_need_update != nullptr)
            pool->main_loop_need_update();
    }

Ну и нам необходима функция, которая будет вызываться из главного потока:

Код

    void process_main_loop() {
        auto pool = thread_pool::shared_pool();
        std::unique_lock<std::mutex> lock(pool->main_thread_mutex);
        while (!pool->main_queue.empty()) {
            auto task = pool->main_queue.front();
            pool->main_queue.pop();
            task();
        }
    }

Не бойтесь велосипедов. Или еще один Grand Central Dispatch (GCD) на C++11

Теперь только два вопроса: «Как?» и «Зачем?»

Сначала «Зачем?»: C++ довольно часто используется для написания кросс-платформенного ПО. В угоду переносимости от многих удобных вещей необходимо отказаться. GCD очень удобная библиотека, предоставляющая простой, наглядный и удобный способ управления асинхронными очередями.
На вопрос «Как?» нет однозначного ответа. Вклиниться ранлуп можно по разному. Многие системы предастовляют API для этого. Например, в iOS есть «performSelectorOnMainThread:». Нам нужно лишь задать коллбэк через dispatch::set_main_loop_process_callback:

-(void)dispatchMainThread{
    dispatch::process_main_loop();
}
- (BOOL)application:(UIApplication *)application didFinishLaunchingWithOptions:(NSDictionary *)launchOptions{
    dispatch::set_main_loop_process_callback([=]{
        [self performSelectorOnMainThread:@selector(dispatchMainThread) withObject:nil waitUntilDone:NO];
    });
    return YES;
}

Если-же мы сами организуем наш собственный ранлуп, то можно сделать что-то такое:

    void main_loop(dispatch::function main_loop_function);

    void main_loop(dispatch::function main_loop_function){
        auto main_queue = queue::main_queue();
        while (!thread_pool::shared_pool()->stop) {
            main_queue->async(main_loop_function);
            process_main_loop();
        }
    }

Не бойтесь велосипедов. Или еще один Grand Central Dispatch (GCD) на C++11

А теперь собственно то, ради чего это и затевалось:

Создадим 6 очередей и запихнем в каждую по 6 заданий:

   auto main_thread_id = std::this_thread::get_id();
    for (unsigned task = 0; task < 6; ++task)
    for (unsigned priority = 0; priority < 6; ++priority){
        dispatch::queue(priority).async([=]{
            assert(std::this_thread::get_id() != main_thread_id);
            std::string task_string = std::to_string(task);
            std::string palceholder(1+priority*5, ' ');
            dispatch::queue::main_queue()->async([=]{
                assert(std::this_thread::get_id() == main_thread_id);
                std::cout << palceholder << task_string << std::endl;
            });
        });
    }

Получим примерно такую картинку


                          0
                          1
                     0
                0
                          2
                     1
                1
                          3
                     2
                2
                          4
                     3
                3
                          5
                     4
                4
           0
                     5
                5
           1
      0
 0
           2
      1
 1
           3
      2
 2
           4
      3
 3
           5
      4
 4
      5
 5

«Столбик» представляет собой очередь. Чем правее, тем более высокий приоритет у очереди. Линия — это коллбэки на «главный поток».

Ну и код для iOS:

    for (int i = 0; i < 20; ++i){
        dispatch::queue(dispatch::QUEUE_PRIORITY::DEFAULT).async([=]{
            NSAssert(![NSThread isMainThread], nil);
            std::string first_string = std::to_string(i);
            dispatch::queue::main_queue()->async([=]{
                NSAssert([NSThread isMainThread], nil);
                std::string second_string = std::to_string(i+1);
                std::cout << first_string << " -> " << second_string << std::endl;
                [self.tableView reloadData]; // Делаем что-то с UI. То, что можно делать только из главного потока
            });
        });
    }

Заключение

Заключения не будет. Этот велосипед писался исключительно с целью пощупать многопоточность в C++11. Код представляет собой чуть более 200 строк не особо хорошего С++ кода, представлен на гитхабе [5]. Проверялся на clang++ 5.0, g++-4.8 и компилятором 2012 Visual Studio. То есть основные компиляторы уже в достаточной мере поддерживают C++11.

Пы.Сы. Призывая писать свои велосипеды я отнюдь не призываю их использовать на боевых проектах. Хотя, с другой стороны, как еще велосипед может превратиться во что-то серьезное?

Ну и еще пара велосипедов, которые я не придумал куда впихнуть в статье

Не бойтесь велосипедов. Или еще один Grand Central Dispatch (GCD) на C++11
Не бойтесь велосипедов. Или еще один Grand Central Dispatch (GCD) на C++11
Не бойтесь велосипедов. Или еще один Grand Central Dispatch (GCD) на C++11

Автор: code_monkey

Источник [6]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/programmirovanie/38438

Ссылки в тексте:

[1] NSOperationQueue: http://developer.apple.com/library/mac/#documentation/Cocoa/Reference/NSOperationQueue_class/Reference/Reference.html

[2] GCD: http://ru.wikipedia.org/wiki/Grand_Central_Dispatch

[3] Техника написания аналога await/async из C# для C++: http://habrahabr.ru/post/185706/

[4] Thread concurrency C++11: http://habrahabr.ru/post/184562/

[5] представлен на гитхабе: https://github.com/nut-code-monkey/dispatch

[6] Источник: http://habrahabr.ru/post/186200/