Использование Python в многопоточном приложении на C++ и настоящая многопоточность в Python

в 13:35, , рубрики: c++, embedded, GIL, multithreading, python, python3, sub-interpreter, многопоточность, обертка, Питон, Программирование, метки: , , , , , , , , ,

Все более или менее знающие Python разработчики знают про такую жуткую вещь как GIL. Глобальный блокировщик всего процесса до тех пор пока Python выполняется в одном из потоков. Он даёт потоко-защищённость методами сравнимыми с садизмом, поскольку любая неявная блокировка в многопоточном приложении смерти подобна, всё что опиралось на параллельное выполнение, умирает в мучениях, раз за разом натыкаясь на блокировку GIL.
Известно что по сей день из-за этого скорбного факта программисты на C++ используют Python-обёртки по большей части лишь в однопоточных приложениях, а программисты на Python пытаются всех убедить, что им и так неплохо живётся.
Казалось бы, если поток порождён в C++, он не знает ни о каком GIL, используй Python без блокировок и радуйся. Радость разработчика однако закончится уже на втором потоке запросившем область глобальных переменных без блокировки.
Однако есть путь ведущий к светлому будущему!
Этот путь был изначально в таком языке как Perl, он же поддерживается в Си-API языка Python и я ума не приложу почему подобный механизм не включен в один из стандартных модулей Python! Способ по сути сводит использование различных под-интерпретаторов Python в разных потоках, причём используя свой GIL для каждого(!!!) без всякого шаманства и магии, просто последовательно вызвав несколько функций и стандартного набора Си-API языка Python!

Честная многопоточность в Python

Всё нижеперечисленное основывается на новом GIL, введённом в Python 3.2, отлажено и работает на Python 3.3. Однако для более ранних версий, том же Python 2.7, предлагается использовать то же API, само поведение GIL не так важно, как его запуск из порождённого интерпретатора.
Итак начнём, нам потребуется основной поток, в котором мы просто всё проинициализируем и запустим некоторое количество потоков, работающих с различным функционалом Python, как родным питоновским, так и написанном на C++. Всё будем делать из C++, будем работать с библиотеками boost::python и boost::thread. Если у вас ещё нет библиотеки BOOST, либо вы используете чистый Си вместо C++, это не страшно, основная работа здесь идёт на Си, а BOOST используется лишь для наглядности и простоты разработки, всё то же самое можно сделать на чистом Си, используя API Python и API ОС для работы с потоками.
В начале работы с Python нужно проинициализировать интерпретатор, включить механизм GIL и разрешить многопоточность в GIL, сохранив состояние главного потока:

        Py_Initialize();        // инициализация интерпретатора Python 
        PyEval_InitThreads();   // инициализация потоков в Python и механизма GIL

        mGilState = PyGILState_Ensure();     // забираем себе GIL сразу для настройки многопоточности
        mThreadState = PyEval_SaveThread();  // сохраняем состояние главного потока и отпускаем GIL

        // здесь GIL разблокирован для основного интерпретатора Python и ждёт блокировки из других потоков

Разумеется инициализация подразумевает и освобождение ресурсов, удобнее всего завести класс с конструктором и деструктором, где деструктор восстанавливает состояние потока, освобождает GIL и завершает работу интерпретатора (включая работу под-интерпретаторов):

        // здесь GIL должен быть разблокирован для основного интерпретатора

        PyEval_RestoreThread( mThreadState );   // восстанавливаем состояние главного потока и забираем себе GIL
        PyGILState_Release( mGilState );        // отпускаем блокировку GIL с сохранённым состоянием

        Py_Finalize();  // завершает работу как основного интерпретатора, со всеми под-интерпретаторами Python

Пока что всё очевидно для всех кто когда-либо работал с GIL из Си-API языка Python. Для основного потока всего лишь требуется выполнять роль диспетчера, не блокируя GIL и не мешая остальным потокам делать свою работу. Вот так примерно должен выглядеть класс целиком:

class PyMainThread  // специальный класс для основного потока
{
public:
    PyMainThread()  // нужно создать экземпляр класса в самом начале работы
    {
        Py_Initialize();        // инициализация интерпретатора Python 
        PyEval_InitThreads();   // инициализация потоков в Python и механизма GIL

        mGilState = PyGILState_Ensure();     // забираем себе GIL сразу для настройки многопоточности
        mThreadState = PyEval_SaveThread();  // сохраняем состояние главного потока и отпускаем GIL

        // здесь GIL разблокирован для основного интерпретатора Python и ждёт блокировки из других потоков
    }

    ~PyMainThread() // по завершении работы нужно корректно освободит ресурсы интерпретатора
    {
        // здесь GIL должен быть разблокирован для основного интерпретатора

        PyEval_RestoreThread( mThreadState );   // восстанавливаем состояние главного потока и забираем себе GIL
        PyGILState_Release( mGilState );        // отпускаем блокировку GIL с сохранённым состоянием

        Py_Finalize();  // завершает работу как основного интерпретатора, со всеми под-интерпретаторами Python
    }

private:
    PyGILState_STATE mGilState;     // сохранённое состояние GIL
    PyThreadState* mThreadState;    // сохранённое состояние основного потока
};

Собственно работа в функции main() или её аналоге сводится к следующей схеме:

    PyMainThread main_thread; // начальная инициализация интерпретатора в главном потоке

    boost::thread_group group;

    // порождаем потоки, каждый выполняет свою работу, взаимодействуя с Python без общего GIL
    for( int id = 1; id <= THREAD_NUM; ++id)
        group.create_thread( ThreadWork(id) );

    group.join_all();

Всё, закончили с примитивщиной, народ жаждет магии… ах да, я обещал что её не будет.

Работа в каждом потоке

Если мы сейчас попробуем просто сделать в каждом порождённом потоке time.sleep(1) мы получим падение уже на втором потоке.
Нас спасёт волшебная функция Py_NewInterpreter (!!!), в которой всё бы хорошо, но её использование требует блокировки GIL (!) и это было бы страшно, если бы не тот факт, что GIL приходит и уходит, а под-интерпретатор останется. И уже в нём можно блокировать его GIL сколько угодно, у него самого потоков будет ровно 1 — тот в котором его и создали:

        mMainGilState = PyGILState_Ensure();    // забираем блокировку основного интерпретатора
        mOldThreadState = PyThreadState_Get();  // сохраняем текущее состояние порождённого потока
        mNewThreadState = Py_NewInterpreter();  // создаём в потоке под-интерпретатор
        PyThreadState_Swap( mNewThreadState );  // с этого момента для потока актуален уже под-интерпретатор

        mSubThreadState = PyEval_SaveThread();  // не забываем освободить предыдущую блокировку GIL
        mSubGilState = PyGILState_Ensure();     // и заблокировать GIL уже для под-интерпретатора

Это также лучше всего сделать в конструкторе специального класса, а деструкторе соответственно следующий код:

        PyGILState_Release( mSubGilState );         // разблокируем GIL для под-интерпретатора
        PyEval_RestoreThread( mSubThreadState );    // восстанавливаем блокировку и состояние потока для основного интерпретатора
        Py_EndInterpreter( mNewThreadState );       // завершаем работу под-интерпретатора
        PyThreadState_Swap( mOldThreadState );      // возвращаем состояние потока для основного интерпретатора
        PyGILState_Release( mMainGilState );        // освобождаем блокировку GIL для основного интерпретатора

Код всего класса приведён ниже:

class PySubThread   // класс для работы в каждом порождённом потоке
{
public:
    PySubThread()   // нужно для инициализации под-интерпретатора Python в самом начале работы потока
    {
        mMainGilState = PyGILState_Ensure();    // забираем блокировку основного интерпретатора
        mOldThreadState = PyThreadState_Get();  // сохраняем текущее состояние порождённого потока
        mNewThreadState = Py_NewInterpreter();  // создаём в потоке под-интерпретатор
        PyThreadState_Swap( mNewThreadState );  // с этого момента для потока актуален уже под-интерпретатор

        mSubThreadState = PyEval_SaveThread();  // не забываем освободить предыдущую блокировку GIL
        mSubGilState = PyGILState_Ensure();     // и заблокировать GIL уже для под-интерпретатора
    }

    ~PySubThread()  // по завершении работы потока нужно корректно освободить ресурсы под-интепретатора Python
    {
        PyGILState_Release( mSubGilState );         // разблокируем GIL для под-интерпретатора
        PyEval_RestoreThread( mSubThreadState );    // восстанавливаем блокировку и состояние потока для основного интерпретатора
        Py_EndInterpreter( mNewThreadState );       // завершаем работу под-интерпретатора
        PyThreadState_Swap( mOldThreadState );      // возвращаем состояние потока для основного интерпретатора
        PyGILState_Release( mMainGilState );        // освобождаем блокировку GIL для основного интерпретатора
    }

private:
    PyGILState_STATE mMainGilState;     // состояние GIL основного интерпретатора Python
    PyThreadState* mOldThreadState;     // состояние текущего потока для основного интерпретатора
    PyThreadState* mNewThreadState;     // состояние потока для порождённого под-интерпретатора
    PyThreadState* mSubThreadState;     // сохранённое состояние потока при разблокировке GIL
    PyGILState_STATE mSubGilState;      // состояние GIL для порождённого под-интерпретатора Python
};

Как видите работа по инициализации в каждом потоке уже не столь тревиальна и примитивна, как в основном потоке. Однако мы имеем полный PROFIT для каждого потока. Пусть в каждом отдельном под-интерпретаторе нам придётся заново импортировать модули, однако мы получаем почти полную изолированность данных Python для каждого потока!

Тестируем результат

Итак, давайте проверим, насколько мы правы. Давайте для полноты ощущений заведём свой модуль на Python написанный на C++ и предоставляющи функцию аналог time.sleep:

#include <boost/python.hpp>
#include <boost/thread.hpp>

using namespace boost::python;
using namespace boost::this_thread;
using namespace boost::posix_time;

void wait( double sec ) // функция ожидания, аналог стандарному time.sleep в Python
{
    int msec = static_cast<int>( sec * 1000 );  // переводим в миллисекунды
    sleep( millisec( msec ) );                  // спим указанный в секундах период
}

BOOST_PYTHON_MODULE( ctimer ) // используем boost::python и создаём модуль ctimer
{
    def( "wait", wait, args("sec") );   // в Python это будет ctimer.wait(sec)
}

Собираем DLL и переименовываем в модуль Python ctimer.pyd, если мы под Windows. Полученный модуль ctimer подкладываем для выполнения основного приложения. Будем использовать ctimer.wait наряду со стандартным time.sleep.
Заводим класс-функтор для работы в каждом отдельном потоке:

class ThreadWork    // тестовый класс-функтор для передачи в экземпляр boost::thread
{
public:
    ThreadWork( int id )    // сохраним порядковый номер запущенного потока
        : mID( id )
    {
    }

    void operator () ( void )   // собственно выполняемая работа в каждом потоке
    {
        cout << "Thread#" << mID << " <= START" << endl;

        PySubThread sub_thread; // здесь мы порождаем под-интерпретатор Python

        for( int rep = 1; rep <= REPEAT_TIMES; ++rep )
        {
            // работаем с модулем написаном на Python
            cout << "Thread#" << mID << " <= Repeat#" << rep << " <= import time; time.sleep(pause)" << endl;

            object time = import( "time" );     // import time
            time.attr( "sleep" )( PAUSE_SEC );  // time.sleep(pause)

            // работаем с модулем написаном на C++
            cout << "Thread#" << mID << " <= Repeat#" << rep << " <= import ctimer; ctimer.wait(pause)" << endl;

            object ctimer = import( "ctimer" ); // import ctimer
            ctimer.attr( "wait" )( PAUSE_SEC ); // ctimer.wait(pause)
        }

        cout << "Thread#" << mID << " <= END" << endl;

        // здесь под-интерпретатор Python завершит свою работу
    }

private:
    int mID;    // порядковый номер при запуске потоков
};

Запускаем приложение и радуемся! Потоки параллельно работают с модулями на Python, в каждом потоке отдельно болтаются маленькие под-питончики, которые заблокировали каждый свой GIL и совершенно свободно работают все вместе не мешая друг другу.
Ура, товарищи!

Ссылка на проект MSVS 2012 с исходниками (целых два .cpp файла) и собранными DLL и EXE для Python 3.3 x64 можно скачать здесь (290 KB)

Полезные ссылки

Работа с под-интерпретатором Python
API для работы с потоками, интерпретатором и GIL
Документация по Boost.Python
Документация по Boost.Thread

Автор: Qualab

Источник

Поделиться

* - обязательные к заполнению поля