- PVSM.RU - https://www.pvsm.ru -
На картинке изображен MapReduce в том виде, в каком он реализован в Qt:
QFuture<T> QtConcurrent::mappedReduced(const Sequence &sequence,
MapFunction mapFunction, ReduceFunction reduceFunction /*...*/)
T QtConcurrent::blockingMappedReduced(const Sequence &sequence,
MapFunction mapFunction, ReduceFunction reduceFunction /*...*/)
Столкнулся с тем, что коллеги на работе не знают про MapReduce в Qt Concurrent. Как говорил Гёте: "Чего мы не понимаем, тем не владеем". Под катом будет немножко про Map, про Reduce, про Fork–join model и пример решения простой задачки при помощи MapReduce.
Задачка была взята с просторов интернета как есть:
Написать консольную программу, которая выполняет поиск максимального элемента в массиве с 1000000000 элементов.
MapReduce состоит из функций высшего порядка map [1] и reduce [2]. Функция высшего порядка- это функция которая в качестве аргументов принимает другие функции.
Map [1] применяет функцию к каждому элементу списка, возвращая список результатов. В C++ это можно описать через std::transform [3]:
std::list<int> list{
1, 2, 3, 4, 5, 6
};
std::list<int> newList(list.size(), 0);
std::transform(list.begin(), list.end(),newList.begin(),
[](int v){
return v*2;
});
for(auto i: newList){
std::cout<<i<<" ";
}
Википедия дает определение: функция высшего порядка, которая производит преобразование структуры данных к единственному атомарному значению при помощи заданной функции. Если по простому, то reduce аккумулирует множество элементов(список, вектор и т.д.).
На C++ это можно описать через std::for_each
и функциональный объект [4]
struct Max{
Max():value(std::numeric_limits<int>::min()){
}
void operator()(int val){
value = std::max(value, val);
}
int value;
};
struct Sum{
Sum(): value(0){
}
void operator()(int val){
value += val;
}
int value;
};
//...
std::list<int> list{
1, 2, 3, 4, 5, 6
};
const auto max = std::for_each(list.begin(), list.end(), Max());
const auto sum = std::for_each(list.begin(), list.end(), Sum());
std::cout<<"Max:"<<max.value<<std::endl;
std::cout<<"Sum:"<<sum.value<<std::endl;
Как решить задачку через MapReduce может быть непонятно. Тут следует посмотреть, а может есть какая-нибудь теория? Существует модель параллельных вычислений fork-join [5]. В основе её:
Картинка демонстрирующая модель(взята из wikipedia [5]). Чем-то похоже на изображение в самом начале. MapReduce в Qt это реализация fork-join model.
Для такой задачи стандартное решением является взять вектор, поделить его на несколько непересекающихся отрезков, найти локальный максимум в отрезках, и в конце объединить результат. На std::thread это будет как-то так:
using DataSet = std::vector<int>;
const size_t DATASET_SIZE = 1000000000;
struct Task
{
size_t first;
size_t last;
DataSet& data;
int localMaximum;
};
using Tasks = std::vector<Task>;
void max(Task& task)
{
int localMax = task.data[task.first];
for(size_t item = task.first; item < task.last; ++item)
{
localMax = std::max(localMax, task.data[item]);
}
task.localMaximum = localMax;
}
DataSet data(DATASET_SIZE);
//...
const auto threadCount = std::thread::hardware_concurrency();
const auto taskSize = data.size()/threadCount;
Tasks tasks;
size_t first = 0;
size_t last = taskSize;
// Разбиваем задачу на подзадачи
for(size_t i = 0; i < threadCount; ++i)
{
tasks.push_back(Task{first, last, data, 0});
first+=taskSize;
last = std::min(last+taskSize, data.size());
}
// Запускаем подзадачи
std::vector<std::thread> threads;
for(auto& task: tasks)
{
threads.push_back(std::thread(max, std::ref(task)));
}
// Выполняем объединение
for(auto& thread: threads)
{
thread.join();
}
int Max = tasks[0].localMaximum;
for(const auto& task: tasks)
{
Max = std::max(Max, task.localMaximum);
}
Разбив задачу на подзадачи, можно компактно записать через QtConcurrent::blockingMappedReduced [6]
using DataSet = std::vector<int>;
const size_t DATASET_SIZE = 1000000000;
struct Task
{
size_t first;
size_t last;
DataSet& data;
};
int mapMax(const Task& task)
{
int localMax = task.data[task.first];
for(size_t item = task.first; item < task.last; ++item)
{
localMax = std::max(localMax, task.data[item]);
}
return localMax;
}
void reduceMax(int& a, const int& b)
{
a = std::max(a, b);
}
using Tasks = std::vector<Task>;
//...
const auto threadCount = std::thread::hardware_concurrency();
const auto taskSize = data.size()/threadCount;
Tasks tasks;
size_t first = 0;
size_t last = taskSize;
for(size_t i = 0; i < threadCount; ++i)
{
tasks.push_back(Task{first, last, data, 0});
first+=taskSize;
last = std::min(last+taskSize, data.size());
}
int Max = QtConcurrent::blockingMappedReduced(tasks, mapMax, reduceMax);
На что тут следует обратить внимание:
#include <QtCore/QtDebug>
#include <QtCore/QElapsedTimer>
#include <QtCore/QCoreApplication>
#include <QtConcurrent/QtConcurrent>
#include <cstdlib>
#include <thread>
#include <vector>
#include <algorithm>
using DataSet = std::vector<int>;
const size_t DATASET_SIZE = 1000000000;
struct Task
{
size_t first;
size_t last;
DataSet& data;
};
int mapMax(const Task& task)
{
int localMax = task.data[task.first];
for(size_t item = task.first; item < task.last; ++item)
{
localMax = std::max(localMax, task.data[item]);
}
return localMax;
}
void reduceMax(int& a, const int& b)
{
a = std::max(a, b);
}
using Tasks = std::vector<Task>;
int main(int argc, char *argv[])
{
std::srand(unsigned(std::time(0)));
QCoreApplication a(argc, argv);
DataSet data(DATASET_SIZE);
for(size_t i = 0; i < data.size(); ++i)
{
data[i] = std::rand();
}
QElapsedTimer timer;
timer.start();
const auto threadCount = std::thread::hardware_concurrency();
const auto taskSize = data.size()/threadCount;
Tasks tasks;
size_t first = 0;
size_t last = taskSize;
for(size_t i = 0; i < threadCount; ++i)
{
tasks.push_back(Task{first, last, data});
first+=taskSize;
last = std::min(last+taskSize, data.size());
}
timer.start();
const auto Max = QtConcurrent::blockingMappedReduced(tasks, mapMax, reduceMax);
qDebug() << "Maximum" << Max << "time" <<timer.elapsed() << "milliseconds";
return 0;
}
Автор: RPG18
Источник [7]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/qt-2/193357
Ссылки в тексте:
[1] map: https://ru.wikipedia.org/wiki/Map
[2] reduce: https://ru.wikipedia.org/wiki/%D0%A1%D0%B2%D1%91%D1%80%D1%82%D0%BA%D0%B0_%D1%81%D0%BF%D0%B8%D1%81%D0%BA%D0%B0
[3] std::transform: http://ru.cppreference.com/w/cpp/algorithm/transform
[4] функциональный объект: https://ru.wikipedia.org/wiki/%D0%A4%D1%83%D0%BD%D0%BA%D1%86%D0%B8%D0%BE%D0%BD%D0%B0%D0%BB%D1%8C%D0%BD%D1%8B%D0%B9_%D0%BE%D0%B1%D1%8A%D0%B5%D0%BA%D1%82
[5] fork-join: https://en.wikipedia.org/wiki/Fork%E2%80%93join_model
[6] QtConcurrent::blockingMappedReduced: http://doc.qt.io/qt-5/qtconcurrent.html#blockingMappedReduced
[7] Источник: https://habrahabr.ru/post/311090/?utm_source=habrahabr&utm_medium=rss&utm_campaign=best
Нажмите здесь для печати.