Миграция данных с MySQL на PostgreSQL

в 12:28, , рубрики: c++, mysql, postgresql, метки: , ,

По мере работы с базами данных, ознакомления с их плюсами и минусами, возникает момент, когда принимается решение миграции с одной СУБД в другую. В данном случае возникла задача переноса сервисов с MySQL на PostgreSQL. Вот небольшой перечень вкусностей, которые ждут от перехода на PostgreSQL, версии 9.2 (с более подробным списком возможностей можно ознакомится тут):

  • наследование таблиц (есть ограничения, которые обещают в будущем исправить)
  • диапазоны: int4range, numrange, daterange
  • поддержка из коробки несколько языков для хранимых функций (PL/pgSQL, PL/Tcl, PL/Perl, PL/Python и голый C)
  • оператор WITH, позволяющий делать рекурсивные запросы
  • (планируется) материализованные представления (частично они доступны и сейчас — как IUD правила к представлению)
  • (планируется) триггера на DDL операции

Как правило, существующие решения опираются на работу с уже готовым SQL дампом, который конвертируется в соответствии с синтаксисом целевой БД. Но в некоторых случаях (активно использующееся веб-приложение с большим объемом информации) такой вариант несет определенные временные затраты на создание SQL дампа из СУБД, его конвертации и загрузку получившегося дампа снова в СУБД. Поэтому оптимальней будет online-вариант (прямиком из СУБД в СУБД) конвертера, что может существенно уменьшить простой сервисов.

Языком для реализации выбран C++ (с некоторыми возможностями из C++11x), библиотеки для соединения с MySQL и PostgreSQL использовались нативные, в качестве IDE был задействован Qt Creator.

Алгоритм миграции состоит в следующем. Подразумевается что в БД-получателе уже создана структура таблиц, соответствующая структуре в БД-источнике. Формируется список таблиц для переноса данных, который затем распределяется в пуле потоков. Каждый поток имеет подключение к БД-источнику и к БД-получателю. Т.е. параллельно переносится несколько таблиц. Profit!

Традиционно любое приложение имеет некоторый каркас — набор системных компонент, на которые опираются другие компоненты — работа с конфигурационным файлом, логом, обработчик ошибок, менеджер памяти и прочее. В нашем случае, используется только самое необходимое для решения задачи. Во-первых, были переопределены (исключительно для удобства) некоторые фундаментальные и составные типы (да, знаю, можно было аlias templates использовать, но получилось так):

простые типы

typedef bool t_bool;

typedef char t_char;
typedef unsigned char t_uchar;
typedef signed char t_schar;

typedef int t_int;
typedef unsigned int t_uint;

typedef float t_float;
typedef double t_double;

map

template<typename T, typename U>
class CMap
    : public std::map<T, U>
{
public:	
    CMap();
    virtual ~CMap();

};

template<typename T, typename U>
CMap<T, U>::CMap()
{
}

template<typename T, typename U>
CMap<T, U>::~CMap()
{
}

vector
template<typename T>
class CVector
    : public std::vector<T>
{
public:
    CVector();
    virtual ~CVector();

};

template<typename T>
CVector<T>::CVector()
{
}

template<typename T>
CVector<T>::~CVector()
{
}

fstream

class CFileStream
    : public std::fstream
{
public:
    CFileStream();
    virtual ~CFileStream();

};

Из явных паттернов использован только синглтон:

классический синглтон Мэйерса

template<typename T>
class CSingleton
{
public:
    static T* instance();
    void free();

protected:
    CSingleton();
    virtual ~CSingleton();

};

template<typename T>
T* CSingleton<T>::instance()
{
    static T *instance = new T();

    return instance;
}

template<typename T>
void CSingleton<T>::free()
{
    delete this;
}

template<typename T>
CSingleton<T>::CSingleton()
{
}

template<typename T>
CSingleton<T>::~CSingleton()
{
}

Базовые классы для задачи (выполняется в отдельном потоке) и системы (запускает на выполнение задачи):

task.h
class CTask
{
public:	
    CTask();
    virtual ~CTask();

    void execute();

    t_uint taskID();
    t_bool isExecuted();

protected:
    virtual void executeEvent() = 0;

private:
    t_uint m_task_id;
    t_bool m_executed;

};

task.cpp

CTask::CTask()
    : m_executed(false)
{
    static t_uint task_id = 0;

    m_task_id = task_id++;
}

CTask::~CTask()
{
}

void CTask::execute()
{
    executeEvent();

    m_executed = true;
}

t_uint CTask::taskID()
{
    return m_task_id;
}

t_bool CTask::isExecuted()
{
    return m_executed;
}

system.h

class CSystem
{
public:
    CSystem();
    virtual ~CSystem() = 0;

protected:
    void executeTask(CTask *task);

};

system.cpp

CSystem::CSystem()
{
}

CSystem::~CSystem()
{
}

void CSystem::executeTask(CTask *task)
{
    CTask& task_ref = *task;

    std::thread thread([&]() { task_ref.execute(); });

    thread.detach();
}

В завершении рассмотрения базовых типов нужно упомянуть класс строки, который пришлось написать с нуля, чтобы для некоторых операций (замена подстроки и конкатенация) была возможность работы с передаваемым буфером (о нем чуть ниже) без дополнительных выделений памяти и некоторые вещи (преобразование строки в число и числа в строки) сделать членами класса (приводится только объявление класса):

string.h

class CString
{
public:
    CString(const t_char *data = nullptr);
    CString(const CString& s);
    ~CString();

    const t_char* ptr() const;
    void setPtr(t_char *p);

    CString& operator= (const CString& s);
    CString operator+ (const t_char *p) const;
    CString operator+ (t_char c) const;
    CString operator+ (const CString& s) const;
    friend CString operator+ (const t_char *p, const CString& s);
    CString& operator+= (const t_char *p);
    CString& operator+= (t_char c);
    CString& operator+= (const CString& s);

    t_bool operator== (const CString& s) const;
    t_bool operator!= (const CString& s) const;

    t_bool operator< (const CString& s) const;
    t_bool operator> (const CString& s) const;
    t_bool operator<= (const CString& s) const;
    t_bool operator>= (const CString& s) const;

    t_char& at(t_uint index);
    t_char at(t_uint index) const;

    t_uint length() const;
    t_bool isEmpty() const;

    void clear();

    t_int search(const CString& s, t_uint from = 0) const;
    CString substr(t_uint from, t_int count = -1) const;
    CString replace(const CString& before, const CString& after) const;

    static CString fromNumber(t_uint value);
    static t_uint toUnsignedInt(const CString& s, t_bool *good = nullptr);

    CVector<CString> split(const CString& splitter) const;
    t_bool match(const CString& pattern) const;

    static t_uint replacePtr(const t_char *src, const t_char *before, const t_char *after, char *buffer);
    static t_uint lengthPtr(const t_char *src);
    static t_uint concatenatePtr(const t_char *src, char *buffer);

private:
    t_char *m_data;

    t_uint length(const t_char *src) const;
    t_char* copy(const t_char *src) const;
    t_char* concatenate(const t_char *src0, t_char c) const;
    t_char* concatenate(const t_char *src0, const t_char *src1) const;
    t_int compare(const t_char *src0, const t_char *src1) const;
};

CString operator+ (const t_char *p, const CString& s);

Как неизбежность, для приложения, чуть больше чем «Hello,world», это лог и конфигурационный файл. В методе записи сообщения в лог был задействован мьютекс, так как каждая задача по мере обработки таблицы пишет об этом в лог. Мелкогранулярные блокировки и lockfree-алгоритмы не рассматривались по причине того, что запись в лог — это далеко не узкое место в работе приложения:

log.h

class CLog
    : public CSingleton<CLog>
{
public:
    enum MessageType
    {
        Information,
        Warning,
        Error
    };

    CLog();
    virtual ~CLog();

    void information(const CString& message);
    void warning(const CString& message);
    void error(const CString& message);

private:
    std::mutex m_mutex;

    CFileStream m_stream;

    void writeTimestamp();
    void writeHeader();
    void writeFooter();
    void writeMessage(MessageType type, const CString& message);

};

log.cpp

CLog::CLog()
{
    m_stream.open("log.txt", std::ios_base::out);

    writeHeader();
}

CLog::~CLog()
{
    writeFooter();

    m_stream.flush();
    m_stream.close();
}

void CLog::information(const CString& message)
{
    writeMessage(Information, message);
}

void CLog::warning(const CString& message)
{
    writeMessage(Warning, message);
}

void CLog::error(const CString& message)
{
    writeMessage(Error, message);
}

void CLog::writeTimestamp()
{
    time_t rawtime;
    tm *timeinfo;
    t_char buffer[32];

    time(&rawtime);
    timeinfo = localtime(&rawtime);

    strftime(buffer, 32, "%Y/%m/%d %H:%M:%S", timeinfo);

    m_stream << buffer << " ";
}

void CLog::writeHeader()
{
    writeMessage(Information, "Log started");
}

void CLog::writeFooter()
{
    writeMessage(Information, "Log ended");
}

void CLog::writeMessage(MessageType type, const CString& message)
{
    std::lock_guard<std::mutex> guard(m_mutex);

    writeTimestamp();

    switch (type)
    {
    case Information:
        {
            m_stream << "Information " << message.ptr();

            break;
        }

    case Warning:
        {
            m_stream << "Warning " << message.ptr();

            break;
        }

    case Error:
        {
            m_stream << "Error " << message.ptr();

            break;
        }

    default:
        {
            break;
        }
    }

    m_stream << "n";

    m_stream.flush();
}

config.h

class CConfig
    : public CSingleton<CConfig>
{
public:
    CConfig();
    virtual ~CConfig();

    CString value(const CString& name, const CString& defvalue = "") const;

private:
    CFileStream m_stream;
    CMap<CString, CString> m_values;

};

config.cpp

CConfig::CConfig()
{
    m_stream.open("mysql2psql.conf", std::ios_base::in);

    if (m_stream.is_open())
    {
        CString line;

        const t_uint buffer_size = 256;
        t_char buffer[buffer_size];

        while (m_stream.getline(buffer, buffer_size))
        {
            line = buffer;

            if (!line.isEmpty() && line.at(0) != '#')
            {
                t_int pos = line.search("=");

                CString name = line.substr(0, pos);
                CString value = line.substr(pos + 1);

                m_values.insert(std::pair<CString, CString>(name, value));
            }
        }

        m_stream.close();

        CLog::instance()->information("Config loaded");
    }
    else
    {
        CLog::instance()->warning("Can't load config");
    }
}

CConfig::~CConfig()
{
}

CString CConfig::value(const CString& name, const CString& defvalue) const
{
    CMap<CString, CString>::const_iterator iter = m_values.find(name);

    if (iter != m_values.end())
    {
        return iter->second;
    }

    return defvalue;
}

mysql2psql.conf

# MySQL connection
mysql_host=localhost
mysql_port=3306
mysql_database=mysqldb
mysql_username=root
mysql_password=rootpwd
mysql_encoding=UTF8

# PostgreSQL connection
psql_host=localhost
psql_port=5432
psql_database=psqldb
psql_username=postgres
psql_password=postgrespwd
psql_encoding=UTF8

# Migration
# (!) Note: source_schema == mysql_database
source_schema=mysqldb
destination_schema=public
tables=*
use_insert=0

# Other settings
threads=16

Теперь, что касательно добавления данных в PostgreSQL. Есть два варианта — использовать запросы INSERT, которые на большом массиве данных не очень себя показали в плане производительности (особенности транзакционного механизма), или через команду COPY, которая позволяет непрерывно пересылать порции данных, отправляя в конце передачи специальный маркер (символ-терминатор). Еще один нюанс связан с определением типа (поля в таблице) в PostgreSQL. В документации не указано (возможно не было чтения между строк документации), как можно вернуть человекопонятный идентификатор типа, поэтому было составлено соответствие oid (почти уникальный идентификатор каждого объекта в БД) и типа:

case 20: // int8
case 21: // int2
case 23: // int4
case 1005: // int2
case 1007: // int4
case 1016: // int8
case 700: // float4
case 701: // float8
case 1021: // float4
case 1022: // float8
case 1700: // numeric
case 18: // char
case 25: // text
case 1002: // char
case 1009: // text
case 1015: // varchar
case 1082: // date
case 1182: // date
case 1083: // time
case 1114: // timestamp
case 1115: // timestamp
case 1183: // time
case 1185: // timestamptz
case 16: // bool
case 1000: // bool

Подготовка и выполнение задач состоит в следующем:

  • создается список таблиц
  • создаются подключения (по количеству задач) к БД-источнику и БД-приемнику
  • распределяются диапазоны из списка таблиц задачам
  • задачи запускаются на выполнение (с переданным диапазоном таблиц и подключениями к БД)
  • ожидается выполнение задач (главный поток + созданные потоки)

В каждой задаче выделяется три статических буфера по 50 МБ, в которых происходит подготовка данных для команды COPY (экранирование специальных символов и конкатенация значений полей):

фрагмент кода c подготовкой задач

// create connection pool

t_uint threads = CString::toUnsignedInt(CConfig::instance()->value("threads", "1"));
CLog::instance()->information("Count of working threads: " + CString::fromNumber(threads));

if (!createConnectionPool(threads - 1))
{
    return false;
}

// create tasks

CString destination_schema = CConfig::instance()->value("destination_schema");

t_uint range_begin = 0;
t_uint range_end = 0;

t_uint range = m_tables.size() / threads;

for (t_uint i = 0, j = 0; i < m_tables.size() - range; i += range + 1, ++j)
{
    range_begin = i;
    range_end = i + range;

    std::unique_ptr<CTask> task = std::unique_ptr<CTask>(new CMigrationTask(m_source_pool.at(j), m_destination_pool.at(j), destination_schema, m_tables, range_begin, range_end));

    m_migration_tasks.push_back(std::move(task));
}

range_begin = range_end + 1;
range_end = m_tables.size() - 1;

std::unique_ptr<CTask> task = std::unique_ptr<CTask>(new CMigrationTask(std::move(m_source), std::move(m_destination), destination_schema, m_tables, range_begin, range_end));

// executing tasks

for (t_uint i = 0; i < m_migration_tasks.size(); ++i)
{
    executeTask(m_migration_tasks.at(i).get());
}

task->execute();

// wait for completion

for (t_uint i = 0; i < m_migration_tasks.size(); ++i)
{
    while (!m_migration_tasks.at(i)->isExecuted())
    {
    }
}

фрагмент кода с подготовкой в задаче данных для COPY

t_uint count = 0;
t_char *value;

CString copy_query = "COPY " + m_destination_schema + "." + table + " ( ";

m_buffer[0] = '';
m_buffer_temp0[0] = '';
m_buffer_temp1[0] = '';

if (result->nextRecord())
{
    for (t_uint i = 0; i < result->columnCount(); ++i)
    {
        if (i != 0)
        {
            copy_query += ", ";
            CString::concatenatePtr("t", m_buffer);
        }

        copy_query += result->columnName(i);

        if (!result->isColumnNull(i))
        {
            value = result->columnValuePtr(i);
    
            CString::replacePtr(value, "\", "\\", m_buffer_temp0);
            CString::replacePtr(m_buffer_temp0, "b", "\b", m_buffer_temp1);
            CString::replacePtr(m_buffer_temp1, "f", "\f", m_buffer_temp0);
            CString::replacePtr(m_buffer_temp0, "n", "\n", m_buffer_temp1);
            CString::replacePtr(m_buffer_temp1, "r", "\r", m_buffer_temp0);
            CString::replacePtr(m_buffer_temp0, "t", "\t", m_buffer_temp1);
            CString::replacePtr(m_buffer_temp1, "v", "\v", m_buffer_temp0);

            CString::concatenatePtr(m_buffer_temp0, m_buffer);
        }
        else
        {
            CString::concatenatePtr("\N", m_buffer);
        }
    }

    copy_query += " ) FROM STDIN";

    if (!m_destination_connection->copyOpen(copy_query))
    {
        CLog::instance()->error("Can't execute query '" + copy_query + "', error: " + m_destination_connection->lastError());

        return false;
    }

    CString::concatenatePtr("n", m_buffer);

    if (!m_destination_connection->copyDataPtr(m_buffer))
    {
        CLog::instance()->error("Can't copy data, error: " + m_destination_connection->lastError());

        return false;
    }

    ++count;

    while (result->nextRecord())
    {
        m_buffer[0] = '';

        for (t_uint i = 0; i < result->columnCount(); ++i)
        {
            if (i != 0)
            {
                CString::concatenatePtr("t", m_buffer);
            }

            if (!result->isColumnNull(i))
            {	
                value = result->columnValuePtr(i);

                CString::replacePtr(value, "\", "\\", m_buffer_temp0);
                CString::replacePtr(m_buffer_temp0, "b", "\b", m_buffer_temp1);
                CString::replacePtr(m_buffer_temp1, "f", "\f", m_buffer_temp0);
                CString::replacePtr(m_buffer_temp0, "n", "\n", m_buffer_temp1);
                CString::replacePtr(m_buffer_temp1, "r", "\r", m_buffer_temp0);
                CString::replacePtr(m_buffer_temp0, "t", "\t", m_buffer_temp1);
                CString::replacePtr(m_buffer_temp1, "v", "\v", m_buffer_temp0);
    
                CString::concatenatePtr(m_buffer_temp0, m_buffer);
            }
            else
            {

                CString::concatenatePtr("\N", m_buffer);
            }
        }

        CString::concatenatePtr("n", m_buffer);

        if (!m_destination_connection->copyDataPtr(m_buffer))
        {
            CLog::instance()->error("Can't copy data, error: " + m_destination_connection->lastError());

            return false;
        }

        ++count;

        if (count % 250000 == 0)
        {
            CLog::instance()->information("Working task #" + CString::fromNumber(taskID()) + ":tttable " + table + " processing, record count: " + CString::fromNumber(count));
        }
    }
}
 

Результаты

Для переноса 2 Гб данных в PostgreSQL, c включенным WAL-архивированием, потребовалось порядка 10 минут (создано 16 потоков).

Над чем стоит подумать

  • Определение на этапе выполнения количества задача/потоков — на основании количества данных и доступных аппаратных возможностей
  • Определение количества необходимой памяти под буфер, в котором готовятся данные для COPY
  • Распределение таблиц между задачами не по диапазону, а по необходимости — задачи берут таблицу из threadsafe-стека

Исходный код

Исходный код доступен на github.

Автор: blackmaster

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js