- PVSM.RU - https://www.pvsm.ru -
В этой серии постов мы попробуем решить одну простую задачу с помощью более-менее актуальных технологий параллельного программирования (Нативные потоки, OpenMP, TBB, MPI, CUDA, OpenCL, OpenACC, Chapel может быть еще что-нить экзотическое. Как бы сравнительно и в hands-on ключе.
Чтобы все влезло в один пост (в нескольких частях) и вообще могло быть совокупно охвачено вниманием проблема была выбрана умышленно тривиальная. Поэтому многие аспекты упомянутых технологий и главное трудности которые возникают при реальном прграммировании современных массивно-параллельных систем останутся за кадром. Также не стоит пользоваться приведённым и примерами в качестве бенчмарка «GPU против CPU» или что-то в этом духе. Единственная цель — показать «как оно работает». Пост рассчитан на людей с параллельным программированием не очень знакомых. Под катом будет много кода. Собственно считать мы будем число Пи путем численного интегрирования
Исходные коды также доступны на github.com/undertherain/pi [1] (будут пополнятся по мере написания следующих частей поста).
Давайте для начала сделаем последовательную версию. Т.е. такую, которая использует одно ядро обычного центрального процессора. Возьмем самый простой из методов численного инетгрирования — метод прямоугольников и закодим его на языке Си (вообще дальше будем пользоваться СиС++-ным суржиком в виду ряда причин.
Объявим некоторое количество cntSteps прямоугольников на которые мы разобъем нашу площадь под интегралом, посчитаем основание:
step = 1./static_cast<double>(cntSteps);
и всю площадь, посчитав значение функии в каждом прямоугольнике и домножив на основание.
for (unsigned long i=0; i<cntSteps; i++)
{
x = ( i + .5 ) *step;
sum = sum + 4.0/(1.+ x*x);
}
Впрочем, домножим на основание step мы лучше «за скобками», т.е. за циклом — вот в принципе и все.
Вот полный код программы:
#include <iostream>
#include <iomanip>
#include <sys/times.h>
#include <cmath>
int main(int argc, char** argv)
{
const unsigned long cntSteps=500000000; /* # of rectangles */
double step;
const double PI25DT = 3.141592653589793238462643; //reference Pi value
double pi=0;
double sum=0.0;
double x;
std::cout<< "calculating pi on CPU single-threadedn";
//засекаем время
clock_t clockStart, clockStop;
tms tmsStart, tmsStop;
clockStart = times(&tmsStart);
//вычисляем шаг интегрирования и интегральную сумму
step = 1./static_cast<double>(cntSteps);
for (unsigned long i=0; i<cntSteps; i++)
{
x = (i + .5)*step;
sum = sum + 4.0/(1.+ x*x);
}
pi = sum*step;
//снова засекаем время и печатаем результат
clockStop = times(&tmsStop);
std::cout << "The value of PI is " << pi << " Error is " << fabs(pi - PI25DT) << "n";
std::cout << "The time to calculate PI was " ;
double secs= (clockStop - clockStart)/static_cast<double>(sysconf(_SC_CLK_TCK));
std::cout << secs << " secondsn";
return 0;
}
Копия по ссылке http://dumpz.org/195276/ [2].
Самая, пожалуй, доступная простым пользователям параллельная архитектура — это обычный многоядерный процессор (кажется сейчас уже трудно найти процессор с одним ядром) или несколько процессоров на одной материнской плате. В принципе и одно ядро способно исполнять параллельно несоколько потоков — такой режим называется псевдо-параллельным или конкурентным. Ядро переключается между процессами (процесс — это грубо говоря находящаяся в памяти программа), выделяя каждому квант времени. В принципе такой режим выполнения уже может привести к росту производительности за счет сокрытия латентности памяти, если не на обычных «домашних» процессорах, то на специализированных многопоточных, но об этом позже. В нашем случае избыточное количество потоков привело бы к замедлению из-за накладных расходов на переключение между потоками.
Самый «исторический» способ исползьовать сразу несколько ядер на процессоре — это механизм потоков операционной системы, который сущесвовал задолго до истинно-параллельных процессоров для конкуренности хотя бы ради более удобного написания программ. С точки зрения программиста важно то, что параллельные потоки, исполняемые на разных ядрах или процессорах видят одно и то же адресное пространство, т.е нет нужды явно передавать данные между потоками. Зато если вдруг разные потоки пишутчитают одну и ту же переменную — то придётся озаботиться синхронизацией.
Ладно, давайти ближе к коду: с точки зрения языка Си поток — это обычная функция или метод класса, удовлетворяющая определённому прототипу. Давайте назовём ее static void*worker(void*ptrArgs), аргументом она получает указатель на структуру, в которой можно сделать полей сколько нужно чтобы передать потоку его аргументы. В нашем случае мы скажем потоковой функии в каком диапазоне считать наш интеграл. В теле потоковой функции — уже известный нам цикл, собсно и считающий по диапазону который мы ему передали в параметрах.
for (long long i=args->left; i<args->right; i++)
{
x = (i + .5)*step;
sum = sum + 4.0/(1.+ x*x);
}
Интервал интегрирования для каждого потока мы заранее вычислим изходя из его порядкового номера. Если один из потоков посчитает свою часть раньше — то соответсвующее ядро будет простаивать, т.е. мы потеряем производительность. Идеально было бы разделить интервал на много маленьих участков и раздавать потокам по мере того как они справляются со своей работой. Но пока оставим как есть.
arrArgsThread[idThread].left = idThread*cntStepsPerThread;
arrArgsThread[idThread].right = (idThread+1)*cntStepsPerThread;
На исполнение отдельным потоком функция запускается через системный вызов pthread_create в случае POSIX (в линуксе, например) или в случае windows этто будет аналогичный вызов из Win32 API, вуглядеть будет немного по-другому, но в целом похоже.
Результат будем из каждого потока прибавлять к общем перменной pi += sum*step; (помним что мы находимся в общем адресном пространстве).
Чтобы память не испортилась в случае если два потока будут одновременно изменять одну ячейко нам надо как-то гарантировать что в один момент времени только один поток получает доступ к переменной pi, т.е. создать так называемую «критическую секцию». Для это можно воспользовться специальным механизмом операционной системы под названием мьютекс (от слова mutual exclusion) — если один поток заблокировал мьютекс, другой поток будет ждать (на попытке получить мьютекс самому) пока первый поток его не «освободит».
pthread_mutex_lock(&mutexReduction);
pi += sum*step;
pthread_mutex_unlock(&mutexReduction);
Итого выходит как-то так:
#include <iostream>
#include <iomanip>
#include <cmath>
#include <cstdlib>
#include <pthread.h>
#include <sys/times.h>
#define cntThreads 4
pthread_mutex_t mutexReduction;
double pi=0.;
//параметры которые мы передадим потоковой функции
struct ArgsThread
{
long long left,right;
double step;
};
//потоковая функция
static void *worker(void *ptrArgs)
{
ArgsThread * args = reinterpret_cast<ArgsThread *>(ptrArgs);
double x;
double sum=0.;
double step=args->step;
for (long long i=args->left; i<args->right; i++)
{
x = (i + .5)*step;
sum = sum + 4.0/(1.+ x*x);
}
pthread_mutex_lock(&mutexReduction);
pi += sum*step;
pthread_mutex_unlock(&mutexReduction);
return NULL;
}
int main(int argc, char** argv)
{
const unsigned long num_steps=500000000;
const double PI25DT = 3.141592653589793238462643;
pthread_t threads[cntThreads];
ArgsThread arrArgsThread[cntThreads];
std::cout<<"POSIX threads. number of threads = "<<cntThreads<<std::endl;
clock_t clockStart, clockStop;
tms tmsStart, tmsStop;
clockStart = times(&tmsStart);
double step = 1./(double)num_steps;
long long cntStepsPerThread= num_steps / cntThreads;
//для каждого потока вычисляем его диапазон интегрирования и запускаем поток
for (unsigned int idThread=0; idThread<<cntThreads; idThread++)
{
arrArgsThread[idThread].left = idThread*cntStepsPerThread;
arrArgsThread[idThread].right = (idThread+1)*cntStepsPerThread;
arrArgsThread[idThread].step = step;
if (pthread_create(&threads[idThread], NULL, worker, &arrArgsThread[idThread]) != 0)
{
return EXIT_FAILURE;
}
}
//ждем когда все потоки завершаться с помощью функции join
for (unsigned int idThread=0; idThread<cntThreads; idThread++)
{
if (pthread_join(threads[idThread], NULL) != 0)
{
return EXIT_FAILURE;
}
}
//печатаем!
clockStop = times(&tmsStop);
std::cout << "The value of PI is " << pi << " Error is " << fabs(pi - PI25DT) << std::endl;
std::cout << "The time to calculate PI was " ;
double secs= (clockStop - clockStart)/static_cast<double>(sysconf(_SC_CLK_TCK));
std::cout << secs << " secondsn" << std::endl;
return 0;
}
Копия на http://dumpz.org/195404/ [3] на случай если у кото-то моё адово форматироваие неровно отображается.
На самом деле конкретно в этом примере (но так везти будет не всегда) можно было обойтись без мьютексов вообще если сохранять в каждом потоке результат в отдельную переменную (элемент массиваArgsThread arrArgsThread[cntThreads];
) а потом, дождавшись завершения всех потоков — просуммировать что получилось.
Вот код без мьютексов:
#include <iostream>
#include <iomanip>
#include <cmath>
#include <cstdlib>
#include <pthread.h>
#include <sys/times.h>
#define cntThreads 4
struct ArgsThread
{
long long left,right;
double step;
double partialSum;
};
static void *worker(void *ptrArgs)
{
ArgsThread * args = reinterpret_cast<ArgsThread *>(ptrArgs);
double x;
double sum=0.;
double step=args->step;
for (long long i=args->left; i<args->right; i++)
{
x = (i + .5)*step;
sum = sum + 4.0/(1.+ x*x);
}
args->partialSum=sum*step;
return NULL;
}
int main(int argc, char** argv)
{
const unsigned long num_steps=500000000;
const double PI25DT = 3.141592653589793238462643;
pthread_t threads[cntThreads];
ArgsThread arrArgsThread[cntThreads];
std::cout<<"POSIX threads. number of threads = "<<cntThreads<<std::endl;
clock_t clockStart, clockStop;
tms tmsStart, tmsStop;
clockStart = times(&tmsStart);
double step = 1./(double)num_steps;
long long cntStepsPerThread= num_steps / cntThreads;
for (unsigned int idThread=0; idThread<cntThreads; idThread++)
{
arrArgsThread[idThread].left = idThread*cntStepsPerThread;
arrArgsThread[idThread].right = (idThread+1)*cntStepsPerThread;
arrArgsThread[idThread].step = step;
if (pthread_create(&threads[idThread], NULL, worker, &arrArgsThread[idThread]) != 0)
{
return EXIT_FAILURE;
}
}
double pi=0.;
for (unsigned int idThread=0; idThread<cntThreads; idThread++)
{
if (pthread_join(threads[idThread], NULL) != 0)
{
return EXIT_FAILURE;
}
pi +=arrArgsThread[idThread].partialSum;
}
clockStop = times(&tmsStop);
std::cout << "The value of PI is " << pi << " Error is " << fabs(pi - PI25DT) << std::endl;
std::cout << "The time to calculate PI was " ;
double secs= (clockStop - clockStart)/static_cast<double>(sysconf(_SC_CLK_TCK));
std::cout << secs << " secondsn" << std::endl;
return 0;
}
И копия на http://dumpz.org/195415/ [4].
Как видите, код получилася довольно громоздкий и некросплатформенный. Если последнее решается отчасти с помощью boost::threads(но не все хотят ставить boost) или в новом C++11 потоки вообще стали частью языка (очень здорово вышло на самом деле) — но большая часть софта пока еще использует старый C++. Но проблема громоздкости кода всё равно остается.
OpenMP — это расширение языка (C/C++/Fortran) позволяющее делать примерно то же самое что мы делали с помощью API потоков операционной системы — но гораздо проще и лаконичней с помощью так называемых прагм. Прагма как бы говорит компилатору «взять вот этот код и исполнять параллельно» — и копмилятор делает все остальное.
Для распараллеливания цикла for в нашем первом последовательном примере достаточно добавить туда одну строчку:
#pragma omp parallel for private (x), reduction (+:sum)
for (int i=0; i<numSteps; i++)
{
x = (i + .5)*step;
sum = sum + 4.0/(1.+ x*x);
}
эта прагма говорит, что нужно распараллелить витки цикла, переменную x сделать приватной для каждого потока, по переменной sum потом провести редукцию (или как это по-русски?) по суммированию. Т.е. сначала создать для каждого потока по копии — а потом их сложить. Примерно то же самое, что мы сделали в прошлом примере без мьютекосв. Также OpenMP предоставляет небольшой API для сервисных нужд.
#include <iostream>
#include <iomanip>
#include <sys/times.h>
#include <cmath>
#include <omp.h>
int main(int argc, char** argv)
{
const unsigned long numSteps=500000000; /* default # of rectangles */
double step;
double PI25DT = 3.141592653589793238462643;
double pi=0;
double sum=0.0;
double x;
#pragma omp parallel
{
#pragma omp master
{
int cntThreads=omp_get_num_threads();
std::cout<<"OpenMP. number of threads = "<<cntThreads<<std::endl;
}
}
clock_t clockStart, clockStop;
tms tmsStart, tmsStop;
step = 1./static_cast<double>(numSteps);
clockStart = times(&tmsStart);
#pragma omp parallel for private (x), reduction (+:sum)
for (int i=0; i<numSteps; i++)
{
x = (i + .5)*step;
sum = sum + 4.0/(1.+ x*x);
}
pi = sum*step;
clockStop = times(&tmsStop);
std::cout << "The value of PI is " << pi << " Error is " << fabs(pi - PI25DT) << std::endl;
std::cout << "The time to calculate PI was " ;
double secs= (clockStop - clockStart)/static_cast<double>(sysconf(_SC_CLK_TCK));
std::cout << secs << " secondsn" << std::endl;
return 0;
}
Копия http://dumpz.org/195550/ [5].
Компилировать с опцией -fopenmp в случае g++, с случае другого компилятора обратитесь к руководству пользователя.
Вопросы и комментарии приветсвуются, продолжение — следует!
Автор: blackbird
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/programmirovanie/14434
Ссылки в тексте:
[1] github.com/undertherain/pi: https://github.com/undertherain/pi
[2] http://dumpz.org/195276/: http://dumpz.org/195276/
[3] http://dumpz.org/195404/: http://dumpz.org/195404/
[4] http://dumpz.org/195415/: http://dumpz.org/195415/
[5] http://dumpz.org/195550/: http://dumpz.org/195550/
Нажмите здесь для печати.