Использование подключений к СУБД в многопоточных приложениях на QT

в 17:16, , рубрики: c++, qt, базы данных, Песочница, метки: ,

При написании много-сервисной системы в корой каждый сервис должен быть многопоточен, столкнулись с проблемой использования подключения к базе данных. Сервисы разрабатываются на QT, поэтому использовали модуль QtSql для взаимодействие с БД.

Проблемы

  1. Для каждого потока необходимо свое собственное подключение к БД (QSqlDatabase). При использовании одного подключения из разных потоков возникаем ошибка сегментирования.
  2. Т.к. в текущий момент времени возможно держать открытыми ограниченное число подключений к БД, необходимо реализовать захват, освобождение и ожидание подключения потоками.
  3. В контексте потока, для правильной работы с транзакциями необходимо работать только с одним подключением. Например: Сущность заказ содержит в себе сущности Товар. При сохранении Заказа должны сохранится все товары. Если при сохранении товара возникает исключительная ситуация, то вся транзакция по сохранению заказ должна отменится.
  4. Библиотека должна уметь работать с несколькими БД одновременно, причем разных типов (Mysql,PostgreSQL)

Решение

В итоге у нас получилось 3 класса:

  • Connection — класс обертка отвечающий за работу с БД: Подключение, выполнение и обработка результатов запросов.
  • ConnectionManager — синглтон хоронящий в себе подключени и отвечает за выдачу и освобождение подключений.
  • ManagedConnection — класс обертка для автоматизации захвата и освобождения подключения.

Connection

В конструкторе класса происходит инциализация члена QSqlDatabase _conn и вызывается открытие (open) подключения:

Connection::Connection(const QString& ident, const QString& driver,
		const QString& dbHost, const QString& dbName, const QString& dbUser,
		const QString& dbPassword, int dbPort) :
		_threadId(0), _countRef(0), _countBegins(0), _retryCount(0),_invalid(false) {
	_conn = QSqlDatabase::addDatabase(driver, ident);
	_conn.setHostName(dbHost);
	_conn.setDatabaseName(dbName);
	_conn.setUserName(dbUser);
	_conn.setPassword(dbPassword);
	_conn.setPort(dbPort),
	open();
}

Основные прототипы метод для работы с БД

       void exec(QSqlQuery& sql);
	void exec(const char* sql);
	void exec(const QString& sql);


	/**
	 * возвращает список значений, взятых из первого столбца всех строк набора результатов
	 */
	template <typename T>
	void fetchCol(QSqlQuery& sql,QList<T>& result)...

	template <typename T>
	QList<T> fetchCol(QSqlQuery& sql) ...

	template <typename T>
	QList<T> fetchCol(const char* sql) ...

	template <typename T>
	QList<T> fetchCol(const QString&  sql) ...

/*
 * Возвращает значение  только одной колонку из первой строки
 */
	template <typename T>
	T fetchOne(QSqlQuery& sql, bool* ok = 0) ...

	template <typename T>
	T fetchOne(const QString&  sql, bool* ok = 0) ...

	template <typename T>
	T fetchOne(const char* sql, bool* ok = 0) ...
/*
	 * Методы возващающие одну строку
	 */
	void fetchRow(QSqlQuery& sql,QVariantMap& res);
	QVariantMap fetchRow(const QString&  sql);
	QVariantMap fetchRow(const char* sql);
	QVariantMap fetchRow(QSqlQuery& sql);

/**
 * Методы возвращающие множество строк
 */
	void query(QSqlQuery& sql,QList<QVariant>& result);
	QList<QVariant> query(const char* sql);
	QList<QVariant> query(QSqlQuery& sql);
	QList<QVariant> query(const QString&  sql);
	/* Псевдонимы query	 */
	void fetchAll(QSqlQuery& sql,QList<QVariant>& result);
	QList<QVariant> fetchAll(const char* sql);
	QList<QVariant> fetchAll(QSqlQuery& sql);
	QList<QVariant> fetchAll(const QString&  sql);

Т.к. в QT для работы с БД используется QSqlQuery который зависит от QSqlDatabase то для создания запросов строго обязательно использовать методы:

	QSqlQuery createQuery() const { return QSqlQuery(_conn);}
	QSqlQuery createQuery(const QString& sql) { return QSqlQuery(sql, _conn); }
ManagedConnection

Класс «увязывает» Connection и ConnetcionManager.
При создании объекта происходит попытка запроса подключения у ConnetcionManager по идентификатору (например db1conn). После захвата инциализируется член указатель на подключение. Для удобства, переопределяется оператор -> дабы вызывались методы Connection.
Обычно приложение требует подключение только к одной БД. Поэтому принято было давать ему идентификатор «default».
Тип typedef ManagedConnection DConn позволит получать подключение. Например

DB::DConn conn;
//Эквивалент
DB::ManagedConnection conn("default');
//Для вышеуказанных примеров
DB::ManagedConnection c1("db1conn);
DB::ManagedConnection c2("db2conn);

Возьмем к примеру стек вызовов на псевдокоде. Заказ (Order) сохраняет свои данные в БД и вызывает сохранения у своего члена Item (в идале их много). Item сохраняет свои данные в БД и вызывает сохранение своего члена Data. Data сохраняет в БД свои данные. В итоге вложенность на 3 уровня:

Order : save(){
    DB::DBConn conn; //перый захват. countRef = 1
    conn->query('INSERT INTO order...');    
    item->save();
	      Item:save() {
		  DB::DBConn conn;//второй захват. countRef = 2
		  conn->query('INSERT INTO item...');
		  data->save();
			      Data:save() {
			           DB::DBConn conn;//третий захват. countRef = 3
			           conn->query('INSERT INTO data...');
			           //конец блока, вызов деструктора ~ManagedConnection countRef = 2
			      }
		  //конец блока, вызов деструктора ~ManagedConnection countRef = 1

	      }
    //конец блока, вызов деструктора ~ManagedConnection countRef = 0
    }    
  }
}

ConnectionManager

Хранить в себе настройки для подключения: хост, порт, тип базы, и т.д.
Например для приложения необходимо иметь подключение с базой db1 типа MySQL и db2 типа PostgresSQL. Настройка в конфигурационном файле будет выглядеть так:

[database]
size = 2 
;Идентификатор
1ident=db1conn
;Тип драйвера, в данном случае MySQL
1driver=QMYSQL
; Хост
1host=localhost
; Имя БД
1name=db1
;Пользователь БД
1user=db1_user
;Пароль 
1password=lol
;Порт
1port=3306
;Максимальное кол-во подключений которое может иметь приложение на данный момент
1max_count = 30

2ident=db2conn
2driver=QPSQL
2host=localhost
2name=test
2user=postgres
2password=
2port=5432
2max_count = 30

При старте приложения конфиг считывается и преобразуется в QVariantMap.
Пример инициализации в Application

Application::Application(int& argc, char** argv):
	QCoreApplication(argc, argv)
{
      ...
	QVariantMap stgs = settings();
	DB::ConnectionManager::init(stgs);
       ...
}

Статический член ConnectionManager иницилизируется из конфига (static QMap<QString, ConnectionManager*> _instances;)
В качестве ключа в map будет использоваться идентификатор из конфига ident

void ConnectionManager::init(const QVariantMap& settings) {
	const int size = settings.value("database/size").toInt();

	for (int i = 1; i <= size; ++i) {
		const QString ident = 
			settings.value(QString("database/%1/ident").arg(i), "default").toString();

		ConnectionManager* inst = new ConnectionManager(
				ident,
				settings.value(QString("database/%1/driver").arg(i)).toString(),
				...
			);
		_instances[ident] = inst;

		Log::info(QString("ConnectionManager::init: [%1] [%2@%3:%4] ").arg(inst->_driver).arg(inst->_dbUser).arg(inst->_dbHost).arg(inst->_dbPort));
	}
}

В качестве ключа в map будет использоваться идентификатор из конфига ident
Основной метод класса getConnection (пояснение в комментариях кода):

Connection* ConnectionManager::getConnection() {
	Connection* conn = 0;
	int count = 0;
	while(count++ < MAX_RETRY_GET_CONN_COUNT) {

		pthread_t thread_id = pthread_self();
		//Бегаем по подключениям, ищим подключение которое возможно уже было взято в текущем потоке
		{
			QMutexLocker mlocker(&_mutex);
			for (int i = 0; i < _pool.size(); ++i) {
				conn = _pool.at(i);
				//если находим то возвращаем
				if (conn && conn->threadID() == thread_id && conn->isValid()) {
					//увеличивая счетчик ссылок на это подключение в этом треде
					conn->lock();
					//Log::debug(QString("ConnectionManager::getConnection Возвращем то же самое подключение что было ранее залочено thread [%1])").arg(conn->name()));
					return conn;
				}
			}
		}
		//если не нашли прежде взятые в этом потоке, то будем искать первое не залоченное
		{
			QMutexLocker mlocker(&_mutex);
			for (int i = 0; i < _pool.size(); ++i) {
				conn = _pool.at(i);
				if (conn && !conn->isLocked() && conn->isValid() ) {
					//Log::debug(QString("ConnectionManager::getConnection Захват свободного подключения [%1])").arg(conn->name()));
					//таки лочим его
					conn->lock();
					return conn;
				}
			}
		}

	//если тут оказались то нет больше поключений
		{
			QMutexLocker mlocker(&_mutex);
			if(_currentCount < _maxCount) { //если текущее количество не превышает максимальное
				//то создадим новое подключение
				//try {
					conn = new Connection(
							QString("%1_%2").arg(_ident).arg(_currentCount),
							_driver, _dbHost,
							_dbName, _dbUser,
							_dbPassword,_dbPort
					);
					_currentCount++;
					conn->lock();
					_pool.append(conn);
					return conn;
				/*} catch(exc::Message& ex) {
					delete conn;
					throw ex;
				}*/
			} else {
				//удалим первый невалидный
				//Log::warn("Достигнуто максимальное кол-во [%d] доступных подключений к DB попытка [%d]",_maxCount,count);
				/*for (int i = 0; i < _pool.size(); ++i) {
					conn = _pool.at(i);
					if (!conn->isValid() && !conn->isLocked() ) {
						removeConnection(conn);
						break;

					}
				}*/
			}
		}
		//Если нельзя,
			//вздремнем малость и по новой
		sleep(2);
	}

	Log::crit("После %d не смог подключить подключение к базе данных",MAX_RETRY_GET_CONN_COUNT);
	{
		QMutexLocker mlocker(&_mutex);
		for (int i = 0; i < _pool.size(); ++i) {
			conn = _pool.at(i);
			if (!conn->isValid() && !conn->isLocked()) {
				removeConnection(conn);
				break;
			}
		}
	}
	throw exc::Message("Невозможно получить подключение к базе данных");

	//return 0;
}

Логика работы

ConnetctionManager инициализируется из конфига, чтоб знать с какими настройками создавать подключения, и каково их максимальное кол-во.
При создании экземпляра DB::ManagedConnection происходит обращение к ConnetctionManager и попытка получить указатель на Connetction из ConnetctionManager::getConnetction.
В ConnetctionManager::getConnetction используя несколько попыток происходит:

  1. Попытка взять из накопителя подключение у которого thread_id совпадает с текущим. Если найдено то возвратить, увеличив refCount подключения на 1
  2. Попытка взять из накопителя свободное подключения Если найдено то возвратить, увеличив refCount подключения на 1
  3. Если все подключения заняты и не достигнут макс. предел, то создать новое подключение, положить его в пул и возвратить

После удаления экземпляра класса DB::ManagedConnection происходит уменьшение refCount подключения. Если refCount == 0, подключение становится доступным для захвата других потокам.

Пример использования

QList<QVariant> Bank::banksByAccountNumber(const QString& accountNumber) {
	QList<QVariant> res;
	DB::ManagedConnection conn;

	foreach(const QVariant& row, conn->fetchAll(
			"SELECT `real` as state ,namen as name,namep as full_name,"
			"newnum as bik,ksnp as korr_acc,okpo as okpo,nnp as city,"
			"ind as zip, adr as address,regn as regnum, telef as phones FROM `bankdinfo` WHERE `real` = '' ORDER BY RAND()"
	)) {
		if(isBelongToBank((row.toMap()["bik"]).toString(),accountNumber)) {
			res.append(row);
		}
	}
	Log::debug("Banks found %d",res.size());
	return res;
}

Исходники на Github

Автор: zordon13ru

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js