QThread + QtSql: Асинхронные запросы

в 5:54, , рубрики: >:3, c++, multithread, qt, sql, thread

При написании многопоточных приложений на Qt многие сталкивались с тем, что реализация QtSql строго однопоточная. Что хуже, в ней нет ни единого метода для фонового выполнения запроса. А если запрос большой или просто долгий, выполнение его в основном потоке намертво подвесит интерфейс.

Что нервирует ещё больше, нигде нет хоть сколько-нибудь жизнеспособной реализации такой, казалось бы, нужной вещи. Максимум, который можно найти — пул соединений, работающий вполне синхронно. Что ж, время течёт, всё меняется.

tl;dr
Worker.h:

#include <QVariant>
#include <QThread>
#include <QSqlDatabase>

typedef QVector< QVariant > VariantVector;
typedef QVector< VariantVector > VariantVector2;

class Worker : public QObject {
    Q_OBJECT
public:
    Worker();
    virtual ~Worker();
    bool init();
    bool isInitiated();

    static int queryNum();
    static int exec(Worker * w, QString query, QStringList bindNames = QStringList(), VariantVector bindList = VariantVector());

private slots:
    void slotExec(int num, QString query, QStringList bindNames, VariantVector bindList);

signals:
    void signalExec(int, VariantVector2);
    void queryError(int, const QSqlError &);

protected:
    QThread * th;
    QSqlDatabase db;
};

Worker.cpp:

#include <QSqlRecord>
#include <QSqlQuery>
#include <QSqlError>
#include <QTimer>

QString GenConnectionName(){
    static int num = 0;
    return QString("DBConnection%1").arg(num++);
}


Worker::Worker(): QObject(){
    th = new QThread(this);
    qRegisterMetaType<VariantVector>("VariantVector");
    qRegisterMetaType<VariantVector2>("VariantVector2");
}

Worker::~Worker() {
    th->terminate();
    db.close();
    th->deleteLater();
}


bool Worker::init() {
    if (db.isOpen())
        return true;
    if (!th->isRunning())  {
        th->start(QThread::LowPriority);
        moveToThread(th);
    }
    if (!db.isOpen())  {
        db = QSqlDatabase::addDatabase("QPSQL",connection);
        db.setHostName("localhost");
        db.setPort(5432);
        db.setUserName("username");
        db.setDatabaseName("postgres");
        QTimer timer;
        timer.start(10000);
        forever {
            if (db.isOpen()) break;
            if (db.open()) break;
            if (!timer.remainingTime()) break;
            if (db.lastError().databaseText().contains("ВАЖНО:  система баз данных запускается") == 0)
                continue;
    }    }
    if (!db.isOpen())   return false;
    return true;
}

bool Worker::isInitiated() {
    return th->isRunning() && db.isOpen();
}

int Worker::queryNum() {
    static int i = 0;
    return i++;
}

int Worker::exec(Worker * w, QString q, QStringList s, VariantVector v) {
    int i = queryNum();
    QMetaObject::invokeMethod(w, "slotExec", Qt::QueuedConnection, Q_ARG(int,i), Q_ARG(QString,q), Q_ARG(QStringList,s), Q_ARG(VariantVector,v));
    return i;
}

void Worker::slotExec(int num, QString query, QStringList bindNames, VariantVector bindList) {
    QSqlQuery q(db);
    q.prepare(query);
    for (int i = 0; i < bindNames.size(); ++i)
        q.bindValue(bindNames[i],bindList[i]);
    q.exec();
    if (q.lastError().isValid())
         emit queryError(q.lastError());
    VariantVector2 vv;
    QSqlRecord r = q.record();
    int rn = r.count();
    vv.push_back(VariantVector());
    for (int i = 0; i < rn; ++i)
        vv[0].push_back(r.fieldName(i));
    while(q.next()) {
         static int j = -1; ++j;
         vv.push_back(VariantVector());
         for (int i = 0; i < rn; ++i)
             vv[j].push_back(q.value(i));
    }
    emit signalExec(num,vv);
}

Example.cpp:

class MainView {
// ...
    int id;
    Worker w;
public slots:
void onTest(int mid, VariantVector2 ret) {
    qDebug() << mid;
    if (id != mid) return;
    foreach (VariantVector r, ret)
        foreach (QVariant v, r)
        qDebug() << v;
}
public:
void run() {
    connect(&w,SIGNAL(signalExec(int,VariantVector2)),this,SLOT(onTest(int,VariantVector2)));
    QStringList names;
    VariantVector params;
    names << ":myid";
    params.push_back(3);
   if (w.init());
    id = Worker::exec(&w,"select * from Table where not ID = :myid;",names,params);
    qDebug() << "Finished!";
}
};

Finished!
QVariant(QString, "id")
QVariant(QString, "code")
QVariant(QString, "prefasfk_id")
QVariant(QString, "creationdate")
QVariant(QString, "lasteditdate")
QVariant(int, 1)
QVariant(int, 10)
QVariant(QString, "")
QVariant(QDateTime, QDateTime(2015-12-18 17:05:43.877 RTZ 5 (зима) Qt::TimeSpec(LocalTime)))
QVariant(QDateTime, QDateTime(2015-12-18 17:05:43.877 RTZ 5 (зима) Qt::TimeSpec(LocalTime)))
QVariant(int, 2)
QVariant(int, 20)
QVariant(QString, "")
QVariant(QDateTime, QDateTime(2015-12-18 17:05:45.729 RTZ 5 (зима) Qt::TimeSpec(LocalTime)))
QVariant(QDateTime, QDateTime(2015-12-18 17:05:45.729 RTZ 5 (зима) Qt::TimeSpec(LocalTime)))
QVariant(int, 4)
QVariant(int, 40)
QVariant(QString, "")
QVariant(QDateTime, QDateTime(2015-12-18 17:05:49.152 RTZ 5 (зима) Qt::TimeSpec(LocalTime)))
QVariant(QDateTime, QDateTime(2015-12-18 17:05:49.152 RTZ 5 (зима) Qt::TimeSpec(LocalTime)))

Чтобы вынести соединение в отдельный поток, воспользуемся старой доброй связкой QThread+QObject.

Но не будем об этом.
Worker.h:

#include <QVariant>
#include <QThread>
#include <QSqlDatabase>

class Worker : public QObject {
    Q_OBJECT
public:
    Worker();
    virtual ~Worker();
    bool init();
    bool isInitiated();

    static int queryNum();

protected:
    QThread * th;
    QSqlDatabase db;
};

Worker.cpp:

#include <QSqlRecord>
#include <QSqlQuery>
#include <QSqlError>
#include <QTimer>

QString GenConnectionName(){
    static int num = 0;
    return QString("DBConnection%1").arg(num++);
}

Worker::Worker(): QObject(){
    th = new QThread(this);
}

Worker::~Worker() {
    th->terminate();
    db.close();
    th->deleteLater();
}

bool Worker::init() {
    if (db.isOpen())
        return true;
    if (!th->isRunning())  {
        th->start(QThread::LowPriority);
        moveToThread(th);
    }
    if (!db.isOpen())  {
        db = QSqlDatabase::addDatabase("QPSQL",connection);
        db.setHostName("localhost");
        db.setPort(5432);
        db.setUserName("username");
        db.setDatabaseName("postgres");
        QTimer timer;
        timer.start(10000);
        forever {
            if (db.isOpen()) break;
            if (db.open()) break;
            if (!timer.remainingTime()) break;
            if (db.lastError().databaseText().contains("ВАЖНО:  система баз данных запускается") == 0)
                continue;
    }    }
    if (!db.isOpen())   return false;
    return true;
}

bool Worker::isInitiated() {
    return th->isRunning() && db.isOpen();
}

int Worker::queryNum() {
    static int i = 0;
    return i++;
}

Поток с собственным соединением создан, но запросы всё равно будут выполняться синхронно. Чтобы побороть эту проблему, воспользуемся старыми добрыми сигналами-слотами. Как известно, при передаче между потоками они преобразуются в сообщения и ставятся в очередь. Однако, есть одна проблема. [Боромир.gif] Нельзя просто взять, и добавить вызов слота. Если вызывать метод класса как обычно, Qt заблокирует оба потока, и вызывающий, и вызываемый, пока они не окажутся способны выполнить метод. Чтобы именно добавить вызов слота, нужно воспользоваться методом QMetaObject::invokeMethod. Заметьте, я оставил метод Worker::exec статическим именно из тех же блокирующих побуждений.

//Worker.h
typedef QVector< QVariant > VariantVector;
typedef QVector< VariantVector > VariantVector2;

class Worker : public QObject {
    //...
public:
    static int exec(Worker * w, QString query, QStringList bindNames = QStringList(), VariantVector bindList = VariantVector());

private slots:
    void slotExec(int num, QString query, QStringList bindNames, VariantVector bindList);

signals:
    void signalExec(int, VariantVector2);
    void queryError(int, const QSqlError &);
//...
};

//Worker.cpp
int Worker::exec(Worker * w, QString q, QStringList s, VariantVector v) {
    int i = queryNum();
    QMetaObject::invokeMethod(w, "slotExec", Qt::QueuedConnection, Q_ARG(int,i), Q_ARG(QString,q), Q_ARG(QStringList,s), Q_ARG(VariantVector,v));
    return i;
}

void Worker::slotExec(int num, QString query, QStringList bindNames, VariantVector bindList) {
    QSqlQuery q(db);
    q.prepare(query);
    for (int i = 0; i < bindNames.size(); ++i)
        q.bindValue(bindNames[i],bindList[i]);
    q.exec();
    if (q.lastError().isValid())
         emit queryError(q.lastError());
    VariantVector2 vv;
    QSqlRecord r = q.record();
    int rn = r.count();
    vv.push_back(VariantVector());
    for (int i = 0; i < rn; ++i)
        vv[0].push_back(r.fieldName(i));
    while(q.next()) {
         static int j = -1; ++j;
         vv.push_back(VariantVector());
         for (int i = 0; i < rn; ++i)
             vv[j].push_back(q.value(i));
    }
    emit signalExec(num,vv);
}

Почти готово. Однако, если сейчас запустить исходники, метапроцесор откажется передавать данные, сославшись на неизвестный тип параметров. И в чём-то он прав. Ну что ж, давайте познакомим наших героев.

Worker::Worker(): QObject(){
    th = new QThread(this);
    qRegisterMetaType<VariantVector>("VariantVector");
    qRegisterMetaType<VariantVector2>("VariantVector2");
}

Теперь всё, осталось только создать слоты, соединить и вызвать. Учтите, в данной реализации первой строкой передаются названия столбцов таблицы.

class MainView {
// ...
    int id;
    Worker w;
public slots:
void onTest(int mid, VariantVector2 ret) {
    qDebug() << mid;
    if (id != mid) return;
    foreach (VariantVector r, ret)
        foreach (QVariant v, r)
        qDebug() << v;
}
public:
void run() {
    connect(&w,SIGNAL(signalExec(int,VariantVector2)),this,SLOT(onTest(int,VariantVector2)));
    QStringList names;
    VariantVector params;
    names << ":myid";
    params.push_back(3);
   if (w.init());
    id = Worker::exec(&w,"select * from Table where not ID = :myid;",names,params);
    qDebug() << "Finished!";
}
};

Finished!
QVariant(QString, "id")
QVariant(QString, "code")
QVariant(QString, "prefasfk_id")
QVariant(QString, "creationdate")
QVariant(QString, "lasteditdate")
QVariant(int, 1)
QVariant(int, 10)
QVariant(QString, "")
QVariant(QDateTime, QDateTime(2015-12-18 17:05:43.877 RTZ 5 (зима) Qt::TimeSpec(LocalTime)))
QVariant(QDateTime, QDateTime(2015-12-18 17:05:43.877 RTZ 5 (зима) Qt::TimeSpec(LocalTime)))
QVariant(int, 2)
QVariant(int, 20)
QVariant(QString, "")
QVariant(QDateTime, QDateTime(2015-12-18 17:05:45.729 RTZ 5 (зима) Qt::TimeSpec(LocalTime)))
QVariant(QDateTime, QDateTime(2015-12-18 17:05:45.729 RTZ 5 (зима) Qt::TimeSpec(LocalTime)))
QVariant(int, 4)
QVariant(int, 40)
QVariant(QString, "")
QVariant(QDateTime, QDateTime(2015-12-18 17:05:49.152 RTZ 5 (зима) Qt::TimeSpec(LocalTime)))
QVariant(QDateTime, QDateTime(2015-12-18 17:05:49.152 RTZ 5 (зима) Qt::TimeSpec(LocalTime)))

Вот и всё. Желающие могут добавить построчную передачу данных или, к примеру, удобный пул. Самые отчаяные могут даже добавить синхронные запросы без этих ваших сигналов и слотов.

Автор: iCpu

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js