Начинаем использовать Tarantool в Java проекте

в 20:36, , рубрики: java, mysql, nosql, replication, tarantool, метки: , , , ,

    В статье ниже я попытаюсь кратко рассказать о том, что такое Tarantool и как начать его использовать в уже существующем проекте если вы программируете на Java. Если же вы программируете на другом языке, то вам могут быть интересны некоторые инструменты доступные в коннекторе, такие как возможность редактирование xlog файлов и создание snap файлов из любых данных.

    Tarantool — это ключ-кортеж хранилище данных. Все данные и индексы хранятся в оперативной памяти. Значения составляют кортеж, далее tuple, кортежи — пространство, далее space, пространства — модель данных. Поддерживаются 3 типа данных: 32 битное без знаковое целоe, 64 битное без знаковое целое и бинарная строка, далее NUM, NUM64 и STR соответственно. Для любого пространства должны быть определены тип и структура первичного индекса, например: HASH по полям 1,2 где 1 — NUM, а 2 — NUM64. Вторичные индексы задаются точно так же как и первичные. DML операции атомарны на уровне кортежа и выполняются только по первичному индексу. Для выполнения нескольких операций атомарно нужно использовать встроенный язык Lua. Сохранность данных обеспечивается путём сохранения снимка текущего состояния, далее snapshot, и бинарного лога, далее xlog. Для хранения кортежей используется slab.

    В примерах ниже используется java connector, подробнее о нём можно узнать по адресу dgreenru.github.com/tarantool-java/. Последняя стабильная версия на момент написания статьи 0.1.2. Ниже я рассмотрю пример использования дополнительного функционала позволяющего переносить и синхронизировать данные с любыми другими хранилищами.

Пример переноса таблицы MySQL в Tarantool Box:

mysql> desc user;
+------------+--------------+------+-----+-------------------+----------------+
| Field      | Type         | Null | Key | Default           | Extra          |
+------------+--------------+------+-----+-------------------+----------------+
| id         | int(11)      | NO   | PRI | NULL              | auto_increment |
| username   | varchar(255) | NO   | UNI | NULL              |                |
| email      | varchar(255) | NO   | UNI | NULL              |                |
| enabled    | tinyint(1)   | NO   |     | 1                 |                |
| registered | timestamp    | NO   |     | CURRENT_TIMESTAMP |                |
+------------+--------------+------+-----+-------------------+----------------+
5 rows in set

    Первичный индекс id и 2 вторичных уникальных индекса username и email. Из непереносимых по умолчанию мест можно выделить auto_increment и timestamp. Для первого можно использовать хранимую процедуру box.auto_increment, а для второго можно хранить данные в формате yyyyMMddhhmmss или секундах. Если таблица user достаточно небольшая, то можно просто прочитать данные из mysql и вставить в Tarantool Box, на этой задаче я останавливаться не буду, а расскажу, что делать если таблица очень большая, т.е. содержит очень много записей, пусть и каждая из них небольшого размера. Для начала нужно выгрузить данные в удобный для нас формат, желательно не сильно занимая при этом ресурсы сервера.

mysql> select * into outfile '/tmp/user' from user;
Query OK, 73890541 rows affected

$ head -1 /tmp/user
1	username	email@domain.tld	1	2012-10-14 01:27:05

    Скопировав файл на нужный сервер или локальный компьютер, можно приступить к его обработке и конвертации в формат Tarantool Box. В примере ниже, для простоты, не рассматриваются эскейп последовательности. Если у вас в таблицах встречаются символы табуляции, переноса строки, возврата каретки, обратный слэш или поля содержат NULL значения, вам нужно добавить их обработку самостоятельно.

BufferedReader reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(new FileInputStream("/tmp/user.gz")), "utf-8"));
SnapshotWriter writer = new SnapshotWriter(new FileOutputStream("/tmp/user.snap").getChannel());
String line = null;
DateFormat indf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
DateFormat outdf = new SimpleDateFormat("yyyyMMddhhmmss");
Pattern pattern = Pattern.compile("t");
while ((line = reader.readLine()) != null) {
	try {
		String[] values = pattern.split(line);
		if (values.length == 5) {
			Integer id = Integer.parseInt(values[0]);
			String username = values[1];
			String email = values[2];
			byte[] enabled = { Byte.valueOf(values[3]) };
			Long registered = Long.parseLong(outdf.format(indf.parse(values[4])));

			Tuple tuple = new Tuple(5).setInt(0, id).setString(1, username, "UTF-8")
			.setString(2, email, "UTF-8").setBytes(3, enabled).setLong(4, registered);

			writer.writeRow(0, tuple);
		} else {
			System.err.println("Line should be splited in 5 parts, but has " + values.length + " for " + line);
		}
	} catch (Exception e) {
		System.err.println("Can't parse line " + line);
		e.printStackTrace();
	}
}
writer.close();
reader.close();

В результате имеем файл

$ ls -sh /tmp/user.snap
16.1G /tmp/user.snap

теперь необходимо настроить space 0 соответствующим образом.

# Этот параметр ограничивает суммарный размер памяти выделенной под slab блоки. 
# Индексы и другие накладные расходы хранятся вне slab, 
# поэтому суммарная память процесса может быть до 2х больше.
# В нашем случае логично поставить здесь 24 гигабайта.
slab_alloc_arena = 24

# Так же имеет смысл откорректировать количество записей в одном xlog файле.
rows_per_wal = 500000

# И конечно же конфигурация ключей space 0
space[0].enabled = 1
# id. Чтобы использовать box.auto_increment тип дерева должен быть TREE. 
space[0].index[0].type = "TREE"
space[0].index[0].unique = 1
space[0].index[0].key_field[0].fieldno = 0
space[0].index[0].key_field[0].type = "NUM"

#username
space[0].index[1].type = "HASH"
space[0].index[1].unique = 1
space[0].index[1].key_field[0].fieldno = 1
space[0].index[1].key_field[0].type = "STR"

#password
space[0].index[2].type = "HASH"
space[0].index[2].unique = 1
space[0].index[2].key_field[0].fieldno = 2
space[0].index[2].key_field[0].type = "STR"

Далее нам нужно заменить 00000000000000000001.snap находящегося в папке work_dir из конфигурационного файла на созданный нами файл.

$ mv /tmp/user.snap /var/lib/tarantool/00000000000000000001.snap

и попробовать запустить сервер

$ tarantool_box --background   
$ ps -C tarantool_box -o pid=,cmd=
 8853 tarantool_box: primary pri: 33013 sec: 33014 adm: 33015

так же посмотрите файл tarantool.log, в случае успешного запуска он будет заканчиваться на строки похожие на приведённые ниже, в случае ошибки, вы сразу увидите причину.

1350504007.249 7127 1/sched _ I> Space 0: done
1350504007.249 7127 101/33013/primary _ I> bound to port 33013
1350504007.249 7127 101/33013/primary _ I> I am primary
1350504007.249 7127 102/33014/secondary _ I> bound to port 33014
1350504007.250 7127 103/33015/admin _ I> bound to port 33015
1350504007.251 7127 1/sched _ C> log level 4
1350504007.251 7127 1/sched _ C> entering event loop					

Далее корректность вставки данных можно проверить простым способом

$ tarantool -a 127.0.0.1
127.0.0.1> select * from t0 where k0 = 1
Select OK, 1 rows affected
[1, 'username', 'email@domain.tld', 'x01', 'x21x8bxe4xc9x4cx12']
127.0.0.1> select * from t0 where k1 = 'username'
Select OK, 1 rows affected
[1, 'username', 'email@domain.tld', 'x01', 'x21x8bxe4xc9x4cx12']
127.0.0.1> select * from t0 where k2 = 'email@domain.tld'
Select OK, 1 rows affected
[1, 'username', 'email@domain.tld', 'x01', 'x21x8bxe4xc9x4cx12']

т.е. мы проверили нахождение данных по 3-м ключам, указанным нами в конфиге. Далее можно посмотреть количество потребляемой процессом памяти в системе и отчёт команды show slab в консоли Tarantool Box.

    Tarantool Box запущен, теперь нужно позаботится о резервном копирование данных и поддержке таблицы MySQL в актуальном состояние на случай, если какие-то запросы используют данные из неё в своих целях. Это достаточно просто организовать при помощи класса ReplicationClient. Он позволит иметь почти полную резервную копию xlog без использования полноценного slave сервера и организовать обновление таблицы в MySQL без затрат дополнительных ресурсов и времени. Не забудьте указать replication_port в конфиге, чтобы сделать репликацию возможной. Описанный ниже класс сохраняем все логи полученные от сервера в файлы длинной по 50 тыс. записей. Алгоритм работы достаточно простой:
1. поиск уже существующих логов
2. определение максимального lsn
3. подключение на порт репликации
4. транслируем в файл получаемые данные
Логика обновления MySQL в данном коде отсутствует, но её легко реализовать немного изменив цикл в функции main. Сложным место является расширение класса ReplicationClient кодом, который записывает получаемые данные в бинарный лог, расширяя их до формата xlog. На этом месте можно особо не останавливаться, т.к. данный пример скорее заготовка для реального приложения, чем демонстрация использования.

public class Backup {
	protected DecimalFormat xlogNameFormat = new DecimalFormat("00000000000000000000");
	protected String folder;
	protected FileChannel xlogChannel;
	protected int row;
	protected int limit = 50000;
	protected long lsn = 0L;
	protected ReplicationClient client;
	protected XLogWriter writer;

	public void setLimit(int limit) {
		this.limit = limit;
	}

	public Backup(String folder, String host, int port) throws IOException {
		this.folder = folder;

	}

	protected void getLatestLSN(String folder) throws IOException, FileNotFoundException {
		final File backupFolder = new File(folder);
		String[] xlogs = backupFolder.list(new FilenameFilter() {

			@Override
			public boolean accept(File dir, String name) {
				return name.endsWith(".xlog");
			}
		});
		boolean hasLogs = xlogs != null && xlogs.length > 0;
		if (hasLogs) {
			Arrays.sort(xlogs);
			XLogReader reader = new XLogReader(new FileInputStream(folder + "/" + xlogs[xlogs.length - 1]).getChannel());
			XLogEntry xlogEntry = null;
			while ((xlogEntry = reader.nextEntry()) != null) {
				lsn = xlogEntry.header.lsn;
			}
			reader.close();
		}
	}

	public void start() throws IOException {
		getLatestLSN(folder);
		System.out.println("Planning to start from lsn: " + lsn);
		Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

			@Override
			public void run() {
				try {
					synchronized (this) {
						close();
					}
				} catch (IOException e) {
					throw new IllegalStateException("Can't close xlog", e);
				}
			}
		}));

		final ByteBuffer rowStartMarker = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(Const.ROW_START_MARKER);
		client = new ReplicationClient(SocketChannel.open(new InetSocketAddress("127.0.0.1", 33016)), lsn + 1L) {

			@Override
			protected ByteBuffer readBody(Header header) throws IOException {
				if (Backup.this.xlogChannel == null) {
					Backup.this.xlogChannel = nextFile(folder);
				}
				ByteBuffer body = super.readBody(header);
				this.header.flip();
				rowStartMarker.flip();
				synchronized (Backup.this) {
					while (rowStartMarker.hasRemaining())
						Backup.this.xlogChannel.write(rowStartMarker);
					while (this.header.hasRemaining())
						Backup.this.xlogChannel.write(this.header);
					while (body.hasRemaining())
						Backup.this.xlogChannel.write(body);
					Backup.this.xlogChannel.force(false);
					body.flip();
				}
				return body;
			}

		};

	}

	public XLogEntry nextEntry() throws IOException {
		XLogEntry entry = client.nextEntry();
		lsn = entry.header.lsn;
		if (++row >= limit) {
			close();
			xlogChannel = nextFile(folder);
			row = 0;
		}
		return entry;
	}

	protected FileChannel nextFile(String folder) throws IOException {
		String fileName = folder + "/" + xlogNameFormat.format(lsn + 1L) + ".xlog";
		new File(fileName).createNewFile();
		FileChannel channel = new FileOutputStream(fileName, true).getChannel();
		writer = new XLogWriter(channel);
		return channel;
	}

	public void close() throws IOException {
		if (writer != null) {
			writer.close();
		}
	}

	public static void main(String[] args) throws IOException {
		final Backup backup = new Backup("/home/dgreen/backup", "localhost", 33016);
		backup.start();
		XLogEntry entry = null;
		while ((entry = backup.nextEntry()) != null) {
			StringBuilder pk = new StringBuilder();
			for (int i = 0; i < entry.tuple.size(); i++) {
				if (pk.length() > 0) {
					pk.append(" - ");
				}
				switch (entry.tuple.getBytes(i).length) {
				case 4:
					pk.append(String.valueOf(entry.tuple.getInt(i)));
					break;
				case 8:
					pk.append(String.valueOf(entry.tuple.getLong(i)));
					break;
				default:
					pk.append(entry.tuple.getString(i, "UTF-8"));
				}

			}
			switch (entry.op) {
			case Update.OP_CODE:
				System.out.println("Got update on #" + pk.toString());
				break;
			case Insert.OP_CODE:
				System.out.println("Got insert " + pk.toString());
				break;
			case Delete.OP_CODE:
				System.out.println("Got delete of #" + pk.toString());
				break;
			default:
				System.out.println("Got unknown op " + entry.op + " " + pk.toString());
				break;
			}

		}
	}
}

    Так же отдельно хотелось бы отметить, что при использовании функционала работы с xlog файлами практически невозможно потерять данные, даже если вы случайно удалили кортеж или очистили целиком space, используя классы XLogReader и XLogWriter вы сможете легко отредактировать xlog.

    На этом в принципе всё, ещё раз напоминаю, что более подробно о коннекторе можно узнать по адресу dgreenru.github.com/tarantool-java, исходный код использованных примеров доступен в репозитории на гитхабе.

Автор: dgreen

Поделиться

* - обязательные к заполнению поля