Пишем простой ThreadPool с async и future

в 18:04, , рубрики: asynchronous, c++, threads, Программирование, с++11, метки: , , ,

Доброе время суток! Я занимаюсь разработкой MMORPG. В игре асинхронной загрузкой файлов занимается файловая система встроенная в движок, остальная логика обрабатывалась в основном потоке. Но помимо загрузки файлов есть и другие тяжеловесные операции, например инициализация персонажа, которая включает в себя распарсивание XML данных, композиция текстур и т.д. И я столкнулся с необходимостью разгрузить основной поток игры для повышения fps.
Вооружившись гуглом и прочитав несколько последних статей на Хабре о потоках, решил попробовать свои силы и написать свой велосипед ThreadPool. Основной моей целью было написать простой в использовании и расширяемый пул потоков, с возможностью асинхронного вызова функций с любым количеством аргументов и получения возвращаемого значения. Предлагаемые на Хабре решения меня не устроили в силу многих проблем, от которых я постарался избавиться в своем решении – использовать минимум шаблонной магии и отказаться от абстрактных классов и наследования. Я хочу поделиться своим опытом с вами.

И так, необходимо реализовать всего два класса, сам пул и класс рабочего потока, экземпляры которого будут управляться пулом.
Для реализации я использовал следующие заголовочные файлы и классы:

#include <functional>
#include <thread>
#include <queue>
#include <mutex>
#include <memory>

using std::function;
using std::queue;
using std::mutex;
using std::thread;
using std::shared_ptr;
using std::vector;

Класс рабочего потока:

typedef function<void()> fn_type;

class Worker
{    
public:

	Worker()
		:enabled(true),fqueue()
		,thread(&Worker::thread_fn, this)
	{}
	~Worker()
	{
		enabled = false;
		thread.join();
	}  
	void appendFn(fn_type fn)
	{
		mutex.lock();
		fqueue.push(fn);
		mutex.unlock();
	}

	queue<fn_type>	fqueue;

protected:

	mutex		mutex;
	thread		thread;
	volatile bool	enabled;	

	void thread_fn();
};

typedef shared_ptr<Worker> worker_ptr;

Класс довольно простой, содержит мютекс для синхронизации, сам std::thread, флаг включения и очередь функторов std::function<void()>. Вы наверно спросите – а как мы будем вызывать функции других типов? Ответ ждет вас ниже в статье.
Функция вызываемая в потоке будет выглядеть вот так:

void Worker::thread_fn()
{
	while (enabled)
	{
		if(!fqueue.empty())
		{
			mutex.lock();
			fn_type fn = fqueue.front();
			fqueue.pop();
			mutex.unlock();
			fn();
		}
		else
			std::this_thread::yield(); // В конечной реализации я использовал тут sleep
	}
}

В цикле просматриваем очередь на наличие функторов. если нашли — вытаскиваем и вызываем.
Это все, что нам необходимо для этого класса.

Класс ThreadPool

Теперь сам пул потоков. Я специально не стал прикручивать к нему плюшки типа очередей с приоритетами, синглтона и т.п. В статье мы рассмотрим только базовую реализацию. Все остальное при желании можно легко добавить.

class ThreadPool
{
public:

	ThreadPool(size_t threads = 1)
	{
		if (threads==0)
			threads=1;
		for (size_t i=0; i<threads; i++)
		{
			worker_ptr pWorker(new Worker);
			_workers.push_back(pWorker);
		}
	}
	~ThreadPool()
	{
		_workers.clear();
	}

protected:
		
	worker_ptr getFreeWorker();

	vector<worker_ptr> _workers; 

};

При создании в конструкторе можно указать сколько потоков будет использовать пул.
Для получения потока в который мы будем передавать наши функторы используем функцию getFreeWorker:

worker_ptr Worker::getFreeWorker()
{
	worker_ptr pWorker = _workers[0];
	size_t minTasks = pWorker->fqueue.size();				
	for (auto &it : _workers)
	{
		if (it->fqueue.empty())
			return it;
		else if (minTasks > it->fqueue.size())
		{
			minTasks = it->fqueue.size();
			pWorker = it;
		}
	}
	return pWorker;
}

Функция находит ближайший свободный поток либо наименее загруженный из них и возвращает его.
Таким образом нагрузка будет распределена между потоками равномерно.

Вызов функций с аргументами

Окей, пул мы создали, как передать в него нашу функцию?
Для этого мы используем новые возможности С++11 такие как std::bind, лямбды, std::function и variadic templates.
Добавим в класс ThreadPool следующий метод:

template<class _FN, class... _ARGS>
void runAsync(_FN _fn, _ARGS... _args)
{
	getFreeWorker()->appendFn(std::bind(_fn,_args...));
}

Вот так просто, без лишних вызовов, кортежей и абстрактных классов. Мы упаковываем указатель на функцию и ее аргументы с помощью bind'a в функтор и сразу передаем его в один из рабочих потоков.
Использовать это довольно просто:

Пример

void foo()					
{ 
	std::cout << "foo()" << std::endl; 
}
void foo2(int i, float f)	
{ 
	std::cout << "foo2(" << i << ", " << f << ")" << std::endl; 
}
struct Baz
{
	void bar() 
	{ 
		std::cout << "Baz::bar()" << std::endl; 
	}
	void bar2(double d, const std::string &str) 
	{ 
		std::cout << "Baz::bar2(" << d << ", " << str << ")" << std::endl; 
	}
};

int main()
{
	ThreadPool pool;

	Baz* pBaz = new Baz();
	Baz baz;

	std::function<void(int,int)> myFn = [&](int a, int b)
	{ 
		std::cout << "functor(" << a << ", " << b << ")" << std::endl;
	 };

	// Вызов простой функции
	pool.runAsync(&foo);
	// Вызов функции с аргументами
	pool.runAsync(&foo2,100,54.5f);
	// Вызов метода класса, указатель на класс передаем 2м аргументом
	pool.runAsync(&Baz::bar,pBaz);
	// Вызов метода класса с аргументами *
	pool.runAsync(&Baz::bar2,&baz,400.3,"Hello World!");
	// Вызов лямбда функции
	pool.runAsync([](){ std::cout << "lambda()" << std::endl; });
	// Вызов функтора
	pool.runAsync(myFn,10,20);
}

В результате получим вот такое, либо в другом порядке, если потоков много:

foo()
foo2(100, 54.5)
Baz::bar()
Baz::bar2(400.3, Hello World!)
lambda()
functor(10, 20)

* Обратите внимание, в примере, где мы передаем метод класса с аргументами в качестве указателя на объект Baz использовался локальный объект на стеке. Я добавил это для примера, но так лучше не делать, в данном случае нужно быть внимательным, объект для которого вызывается функция находится на стеке и если поток выполнит функцию когда объект будет уже уничтожен, с большой вероятностью ваша программа упадет.

Теперь нам нужен метод, который позволял бы получить результат работы функции которую мы передаем.
Для этого добавим простую структуру которая будет хранить возвращаемое значение наподобие std::future.

template<class _T>
struct AData
{
	AData():ready(false){}
	volatile bool ready;
	_T data;
};

Два поля, сами данные и флаг, который укажет нам что результат готов к использованию.
Можно создать полноценный клон std::future, с wait методами, но для нашего примера хватит и этого.

Сама функция выглядит так:

template<class _R, class _FN, class... _ARGS>
shared_ptr<AData<_R>> runAsync(_FN _fn, _ARGS... _args)
{
	function<_R()> rfn = std::bind(_fn,_args...);  
	shared_ptr<AData<_R>> pData(new AData<_R>());
	fn_type fn = [=]()
	{
		pData->data = rfn();
		pData->ready = true;
	};
	getFreeWorker()->appendFn(fn);
	return pData;
}

Тут так же упаковываем аргументы с помощью bind'a в функтор и создаем экземпляр нашей структуры для хранения результата. Затем создаем еще одну функцию с помощью лямбды удобного для нас формата void() и передаем ее потоку на выполнение.
Используется точно так же, только мы явно должны указывать тип возвращаемого значения как показано в примере.

Пример

double foo()					
{ 
	return 14.5;
}

int sum(int a, int b)
{
	return a+b;
}

int main()
{
	ThreadPool pool;

	std::function<std::string(int,int)> myFn = [&](int a, int b)
	{ 
		std::string str = "String = " + std::to_string(a)+" "+std::to_string(b);
		return str;
	};

	auto r1 = pool.runAsync<double>(&foo);
	auto r2 = pool.runAsync<int>(&sum,100,54);
	auto r3 = pool.runAsync<bool>([](){ return true; });
	auto r4 = pool.runAsync<std::string>(myFn,50,20);

	while (!r1->ready);  // Ждем результатов
	std::cout << r1->data << std::endl;
	while (!r2->ready); 
	std::cout << r2->data << std::endl;
	while (!r3->ready); 
	std::cout << r3->data << std::endl;
	while (!r4->ready); 
	std::cout << r4->data << std::endl;

	return 0;
}

Вывод программы:
14.5
154
1
String = 50 20

Можно создать шаблонный метод который сам определяет тип возвращаемого значения, например так:

template<class _R, class... _FN_ARGS, class... _ARGS>
shared_ptr<AData<_R>> runAsync(_R (*_fn)(_FN_ARGS...), _ARGS... _args)
{
	function<_R()> rfn = std::bind(_fn,_args...);  
	shared_ptr<AData<_R>> pData(new AData<_R>());
	fn_type fn = [=]()
	{
		pData->data = rfn();
		pData->ready = true;
	};
	getFreeWorker()->appendFn(fn);
	return pData;
}

Но для вызова метода класса такая функция уже не подойдет.
Для этого нам придется добавить отдельную перегрузку принимающую функцию формата __thiscall. Тоже самое для функторов, лямбд и прочего. Поэтому первый вариант более универсален, к тому же он позволяет нам сразу увидеть тип возвращаемого значения не заглядывая в объявление передаваемой функции.

На этом все. Получился очень простой и удобный пул потоков. Код проверялся на VS 2012.
Полный исходник можно глянуть тут.

Автор: Ivaneo

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js