Загружаем данные в Oracle

в 11:18, , рубрики: bulk, java, jdbc, oracle, Телекомы, метки: , ,

В своей предыдущей статье я показал, что при использовании асинхронных запросов, скорость опроса устройств по протоколу SNMP может достигать 9000 запросов в секунду (при условии, что у нас есть достаточное количество устройств для формирования такого потока ответов). Вопрос о том, что делать с этим потоком данных остался открытым.

Обычной практикой является обработка данных мониторинга посредством RDBMS (таких как Oracle Database). Но способны ли традиционные реляционные базы данных справиться с такой нагрузкой? Попробуем в этом разобраться.

День жестянщика

Начнем с разбора ошибок, обычно допускаемых новичками, впервые сталкивающимися с Oracle. Обработка данных мониторинга может предусматривать достаточно сложную логику, но об этом мы будем говорить позже. Пока просто попробуем сохранить полученные данные в следующую таблицу:

create sequence test_data_seq;

create table test_data (
  id             number                              not null,
  device_id      number                              not null,
  parameter_id   number                              not null,
  value          varchar2(100),
  event_date     date             default sysdate    not null
);

create unique index test_data_pk on test_data(id);

alter table test_data add
  constraint pk_test_data primary key(id);

Автоматизируем генерацию ID триггером, как это советуют многие online-источники:

create or replace trigger test_data_bri
before insert
on test_data
for each row
begin
  select test_data_seq.nextval into :new.id from dual;
end;
/

И начнем кодить:

Никогда так не делайте!

package com.acme.ae.tests.jdbc;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class Test {
	
	private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
	private final static String USER_CONN  = "jdbc:oracle:thin:@192.168.124.35:1521:orcl";
	private final static String USER_NAME  = "ais";
	private final static String USER_PASS  = "ais";

	private Connection c = null;

	private void test() throws SQLException {
		StringBuffer sb = new StringBuffer();
		Long timestamp = System.currentTimeMillis(); 
		for (int i = 0; i < 1000; i++) {
			sb.setLength(0);
			sb.append("insert into test_data(device_id, parameter_id, value) values (");
			sb.append(i);
			sb.append(",1,'0')");
			CallableStatement st = c.prepareCall(sb.toString());
			try {
				st.execute();
			} finally {
				st.close();
			}
		}
		System.out.println(1000000L / (System.currentTimeMillis() - timestamp));
	}

	private void start() throws ClassNotFoundException, SQLException {
		Class.forName(CLASS_NAME);
		c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
	}

	private void stop() throws SQLException  {
		if (c != null) {
			c.close();
		}
	}

	public static void main(String[] args) {
		Test t = new Test();
		try {
			try {
				t.start();
				t.test();
			} finally {
				t.stop();
			}
		} catch (Exception e) {
			System.out.println(e.toString());
		}
	}
}

Запустив на выполнение этот код, получаем количество вставок, выполняемых в секунду:

614

Вроде бы неплохо. Но явно меньше 9000. Что мы делаем не так?

Довольно многое (пункты даны в порядке убывания влияния на производительность):

  1. Мы провоцируем hard parse, передавая каждый раз новый вариант запроса (у такого подхода есть и другие недостатки, такие как возможность получения sql injection или заполнение библиотечного кэша, но нам хватит и первой причины, чтобы больше никогда так не делать)
  2. Создавая сессию, JDBC по умолчанию устанавливает включенной настройку auto commit (ниже мы посмотрим, к чему это приводит с точки зрения производительности)
  3. Вызов триггера приводит к переключению контекстов SQL и PL/SQL (это не очень существенно при вставке одиночных записей, но весьма заметно при выполнении массовых вставок)
  4. При массовой вставке, sequence имеет смысл кэшировать (но это не наш случай, поскольку мы вставляем по одной записи из одного потока)

Избавимся от триггера и жестких разборов. Поскольку для более сложной обработки данных нам все равно понадобится хранимый код, перенесем вставку в пакет:

test_package.sql

create or replace package test_package as
    procedure addData( p_device    in  number
                     , p_parameter in  number
                     , p_value     in  number );
end test_package;
/

show errors;

create or replace package body test_package as

    procedure addData( p_device    in  number
                     , p_parameter in  number
                     , p_value     in  number ) as
    begin
      insert into test_data(id, device_id, parameter_id, value)
      values (test_data_seq.nextval, p_device, p_parameter, p_value);
    end;

end test_package;
/

show errors;

Исправленный код

package com.acme.ae.tests.jdbc;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class Test {
	
	private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
	private final static String USER_CONN  = "jdbc:oracle:thin:@192.168.124.35:1521:orcl";
	private final static String USER_NAME  = "ais";
	private final static String USER_PASS  = "ais";

	private final static boolean AUTO_COMMIT_MODE = true;

	private final static String  ADD_DATA_SQL     = 
			"begin test_package.addData(?,?,?); end;";

	private Connection c = null;
	
	private void test() throws SQLException {
		CallableStatement st = c.prepareCall(ADD_DATA_SQL);
		try {
			Long timestamp = System.currentTimeMillis(); 
			for (int i = 0; i < 1000; i++) {
				st.setInt(1, i);
				st.setInt(2, 1);
				st.setString(3, "0");
				st.execute();
			}
			System.out.println(1000000L / (System.currentTimeMillis() - timestamp));
		} finally {
			st.close();
		}
	}

	private void start() throws ClassNotFoundException, SQLException {
		Class.forName(CLASS_NAME);
		c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
		c.setAutoCommit(AUTO_COMMIT_MODE);
	}
	
	private void stop() throws SQLException  {
		if (c != null) {
			c.close();
		}
	}

	public static void main(String[] args) {
		Test t = new Test();
		try {
			try {
				t.start();
				t.test();
			} finally {
				t.stop();
			}
		} catch (Exception e) {
			System.out.println(e.toString());
		}
	}
}

Хочу обратить внимание, что теперь текст запроса заканчивается точкой с запятой. По ней Oracle определяет, что мы собираемся выполнять не SQL, а PL/SQL-код.

Запускаем на выполнение:

956

Лучше, но все равно мало.

Быстрее, выше, сильнее!

Что можно сделать чтобы ускорить вставку? Для начала стоит отключить auto commit. При работе с Oracle, слишком частая фиксация транзакций это не только накладные расходы, но и (в некоторых случаях) хороший шанс спровоцировать ошибку ORA-01555.

Разумеется, размер транзакции должен определяться бизнес логикой. Если бизнес-логика требует фиксировать каждую одиночную вставку в отдельной транзакции, то именно так и придется делать. Но (на мой взгляд) лучше управлять фиксацией транзакций самостоятельно чем отдавать этот важный вопрос на откуп JDBC.

Какого размера должна быть транзакция при массовой вставке данных? Универсального ответа, разумеется, нет. До определенного предела, действует правило — чем больше размер транзакции, тем быстрее сохраняются данные. В этой статье я не буду экспериментировать с размером транзакций, а просто выполню вставку всех строк в одной транзакции.

Вставка в одной транзакции

package com.acme.ae.tests.jdbc;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class Test {
	
	private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
	private final static String USER_CONN  = "jdbc:oracle:thin:@192.168.124.35:1521:orcl";
	private final static String USER_NAME  = "ais";
	private final static String USER_PASS  = "ais";
	
	private final static boolean AUTO_COMMIT_MODE = false;

	private final static String  ADD_DATA_SQL     = 
			"begin test_package.addData(?,?,?); end;";

	private Connection c = null;
	
	private void test() throws SQLException {
		CallableStatement st = c.prepareCall(ADD_DATA_SQL);
		try {
			Long timestamp = System.currentTimeMillis(); 
			for (int i = 0; i < 1000; i++) {
				st.setInt(1, i);
				st.setInt(2, 1);
				st.setString(3, "0");
				st.execute();
			}
			c.commit();
			System.out.println(1000000L / (System.currentTimeMillis() - timestamp));
		} finally {
			st.close();
		}
	}

	private void start() throws ClassNotFoundException, SQLException {
		Class.forName(CLASS_NAME);
		c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
		c.setAutoCommit(AUTO_COMMIT_MODE);
	}
	
	private void stop() throws SQLException  {
		if (c != null) {
			c.close();
		}
	}

	public static void main(String[] args) {
		Test t = new Test();
		try {
			try {
				t.start();
				t.test();
			} finally {
				t.stop();
			}
		} catch (Exception e) {
			System.out.println(e.toString());
		}
	}
}

Результат улучшился почти в два раза:

1524

Чтобы улучшить его еще больше, можно вспомнить о временных таблицах:

drop table test_data;

create global temporary table test_data (
  device_id      number                              not null,
  parameter_id   number                              not null,
  value          varchar2(100),
  event_date     date             default sysdate    not null
) on commit delete rows;

Поскольку мы убрали из таблицы столбец id, внесем изменения в реализацию пакета:

test_package.sql

create or replace package body test_package as

    procedure addData( p_device    in  number
                     , p_parameter in  number
                     , p_value     in  number ) as
    begin
      insert into test_data(device_id, parameter_id, value)
      values (p_device, p_parameter, p_value);
    end;

end test_package;
/

show errors;

Так как вставка данных во временную таблицу не фиксируется в REDO-логе (не требуется восстанавливать эти данные при сбое), это ведет к закономерному снижению накладных расходов:

1779

Если использовать DML-запросы вместо обращения к пакету, можно довести это значение до 2000. Неужели это предел?

Разумеется нет! Сейчас, когда мы практически свели на нет накладные расходы связанные с фиксацией транзакций, мы упираемся в другое бутылочное горлышко — в количество запросов, передаваемых по сети. Чтобы решить эту проблему, были разработаны bulk-запросы, позволяющие передавать несколько однотипных изменений в одном запросе.

Вносим необходимые изменения в код:

Bulk запросы

package com.acme.ae.tests.jdbc;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class Test {
	
	private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
	private final static String USER_CONN  = "jdbc:oracle:thin:@192.168.124.35:1521:orcl";
	private final static String USER_NAME  = "ais";
	private final static String USER_PASS  = "ais";
	
	private final static boolean AUTO_COMMIT_MODE = false;
	private final static int     BULK_SIZE        = 10;
	
	private final static String  ADD_DATA_SQL     = 
			"begin test_package.addData(?,?,?); end;";

	private Connection c = null;
	
	private void test() throws SQLException {
		CallableStatement st = c.prepareCall(ADD_DATA_SQL);
		try {
			int bulkSize = BULK_SIZE;
			Long timestamp = System.currentTimeMillis(); 
			for (int i = 0; i < 1000; i++) {
				st.setInt(1, i);
				st.setInt(2, 1);
				st.setString(3, "0");
				st.addBatch();
				if (--bulkSize <= 0) {
					st.executeBatch();
					bulkSize = BULK_SIZE;
				}
			}
			if (bulkSize < BULK_SIZE) {
				st.executeBatch();
			}
			c.commit();
			System.out.println(1000000L / (System.currentTimeMillis() - timestamp));
		} finally {
			st.close();
		}
	}

	private void start() throws ClassNotFoundException, SQLException {
		Class.forName(CLASS_NAME);
		c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
		c.setAutoCommit(AUTO_COMMIT_MODE);
	}
	
	private void stop() throws SQLException  {
		if (c != null) {
			c.close();
		}
	}

	public static void main(String[] args) {
		Test t = new Test();
		try {
			try {
				t.start();
				t.test();
			} finally {
				t.stop();
			}
		} catch (Exception e) {
			System.out.println(e.toString());
		}
	}
}

Запускаем на выполнение:

1779

И не замечаем никакой разницы. Почему?

Дело в том, что bulk-и работают только для DML-запросов (insert, update, delete). Если мы пытаемся вызывать bulk-запросом хранимую процедуру, JDBC эмулирует bulk, отсылая по сети ровно то-же самое количество запросов, что было бы без него.

Исправим этот досадный промах:

Bulk insert-ы

package com.acme.ae.tests.jdbc;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class Test {
	
	private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
	private final static String USER_CONN  = "jdbc:oracle:thin:@192.168.124.35:1521:orcl";
	private final static String USER_NAME  = "ais";
	private final static String USER_PASS  = "ais";
	
	private final static boolean AUTO_COMMIT_MODE = false;
	private final static int     BULK_SIZE        = 10;
	
	private final static String  ADD_DATA_SQL     = 
			"insert into test_data(device_id, parameter_id, value) values (?,?,?)";
	
	private Connection c = null;
	
	private void test() throws SQLException {
		CallableStatement st = c.prepareCall(ADD_DATA_SQL);
		try {
			int bulkSize = BULK_SIZE;
			Long timestamp = System.currentTimeMillis(); 
			for (int i = 0; i < 1000; i++) {
				st.setInt(1, i);
				st.setInt(2, 1);
				st.setString(3, "0");
				st.addBatch();
				if (--bulkSize <= 0) {
					st.executeBatch();
					bulkSize = BULK_SIZE;
				}
			}
			if (bulkSize < BULK_SIZE) {
				st.executeBatch();
			}
			c.commit();
			System.out.println(1000000L / (System.currentTimeMillis() - timestamp));
		} finally {
			st.close();
		}
	}

	private void start() throws ClassNotFoundException, SQLException {
		Class.forName(CLASS_NAME);
		c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
		c.setAutoCommit(AUTO_COMMIT_MODE);
	}
	
	private void stop() throws SQLException  {
		if (c != null) {
			c.close();
		}
	}

	public static void main(String[] args) {
		Test t = new Test();
		try {
			try {
				t.start();
				t.test();
			} finally {
				t.stop();
			}
		} catch (Exception e) {
			System.out.println(e.toString());
		}
	}
}

И запустим код на выполнение снова:

12658

Ха, мы определенно идем на рекорд! Доведя BULK_SIZE до 100, можно получить и вовсе волшебное значение:

31250

Итак, мы научились очень быстро вставлять данные во временную таблицу и… терять их при commit-е, так ничего с ними и не сделав.

Где логика?

Сейчас добавим. Для начала спроектируем схему данных:

image

Пусть у нас имеется список устройств test_device и список параметров test_parameter. Текущее значение каждого параметра будет храниться в test_state, а в test_history будем записывать хронологию изменения значений. Для упрощения кода, будем считать что параметры не могут принимать значение null.

Кроме того, мы будем обрабатывать значения двух типов, заданных в test_parameter_type. Тип параметра будет определять, в каком случае выполняется добавление записи в test_history:

  1. Тип 'default' будем использовать (в частности) для хранения оперативного состояния устройств, запись в test_history добавляется при любом изменении значения
  2. Тип 'uptime' используем для хранения времени безостановочной работы, значение в test_history добавляется из test_state в том случае, если новое значение меньше предыдущего (то есть устройство перезагружалось)

Разумеется, в реальной системе, типов параметров будет больше.

Создаем таблицы

create table test_device (
  id             number                              not null,
  name           varchar2(100)
);

create unique index test_device_pk on test_device(id);

alter table test_device add
  constraint pk_test_device primary key(id);

create table test_parameter_type (
  id             number                              not null,
  name           varchar2(100)
);

create unique index test_parameter_type_pk on test_parameter_type(id);

alter table test_parameter_type add
  constraint pk_test_parameter_type primary key(id);

create table test_parameter (
  id             number                              not null,
  type_id        number                              not null,
  name           varchar2(100)
);

create unique index test_parameter_pk on test_parameter(id);

create index test_parameter_fk on test_parameter(type_id);

alter table test_parameter add
  constraint pk_test_parameter primary key(id);

alter table test_parameter add
  constraint fk_test_parameter foreign key (type_id) 
    references test_parameter_type(id);

create table test_state (
  device_id      number                              not null,
  parameter_id   number                              not null,
  value          varchar2(100)                       not null,
  last_date      date             default sysdate    not null
);

create unique index test_state_pk on test_state(device_id, parameter_id);

create index test_state_fk on test_state(parameter_id);

alter table test_state add
  constraint pk_test_state primary key(device_id, parameter_id);

alter table test_state add
  constraint fk_test_state foreign key (device_id) 
    references test_device(id);

alter table test_state add
  constraint fk_test_state_parameter foreign key (parameter_id) 
    references test_parameter(id);

create sequence test_history_seq;

create table test_history (
  id             number                              not null,
  device_id      number                              not null,
  parameter_id   number                              not null,
  value          varchar2(100)                       not null,
  event_date     date             default sysdate    not null
);

create unique index test_history_pk on test_history(id);

create index test_history_device_fk on test_history(device_id);

create index test_history_parameter_fk on test_history(parameter_id);

alter table test_history add
  constraint pk_test_history primary key(id);

alter table test_history add
  constraint fk_test_history_device foreign key (device_id) 
    references test_device(id);

alter table test_history add
  constraint fk_test_history_parameter foreign key (parameter_id) 
    references test_parameter(id);

и заполняем их данными

Insert into TEST_PARAMETER_TYPE
   (ID, NAME)
 Values
   (1, 'default');
Insert into TEST_PARAMETER_TYPE
   (ID, NAME)
 Values
   (2, 'uptime');
COMMIT;

Insert into TEST_PARAMETER
   (ID, TYPE_ID, NAME)
 Values
   (1, 1, 'status');
Insert into TEST_PARAMETER
   (ID, TYPE_ID, NAME)
 Values
   (2, 2, 'uptime');
COMMIT;

insert into test_device(id, name)
select rownum, object_name
from   all_objects;

commit;

Далее, добавим в пакет процедуру массовой обработки значений:

Новая реализация пакета

CREATE OR REPLACE package test_package as
    procedure saveData;                     
end test_package;
/

show errors;

CREATE OR REPLACE package body test_package as

    procedure saveData as
    begin
    
      -- Добавляем history для параметров типа default
      insert into test_history(id, device_id, parameter_id, value, event_date)
      select test_history_seq.nextval, a.device_id, a.parameter_id, a.value, a.event_date
      from   test_data a
      inner  join test_device b on ( b.id = a.device_id )
      inner  join test_parameter c on ( c.id = a.parameter_id )
      inner  join test_parameter_type d on ( d.id = c.type_id and d.name = 'default' )
      left   join test_state e on ( e.device_id = a.device_id and 
                                    e.parameter_id = a.parameter_id )
      where  e.value is null or e.value <> a.value;
      
      -- Добавляем history для параметров типа uptime
      insert into test_history(id, device_id, parameter_id, value, event_date)
      select test_history_seq.nextval, a.device_id, a.parameter_id, e.value, e.last_date
      from   test_data a
      inner  join test_device b on ( b.id = a.device_id )
      inner  join test_parameter c on ( c.id = a.parameter_id )
      inner  join test_parameter_type d on ( d.id = c.type_id and d.name = 'uptime' )
      left   join test_state e on ( e.device_id = a.device_id and 
                                    e.parameter_id = a.parameter_id )
      where  e.value > a.value;
      
      -- Обновляем state
      merge into test_state a
      using ( select c.device_id, c.parameter_id, c.value, c.event_date
              from   test_data c
              inner  join test_device d on ( d.id = c.device_id )
              inner  join test_parameter e on ( e.id = c.parameter_id )
            ) b
      on ( b.device_id = a.device_id and b.parameter_id = a.parameter_id )
      when matched then
        update set a.value = b.value, a.last_date = b.event_date
      when not matched then
        insert (device_id, parameter_id, value, last_date)
        values (b.device_id, b.parameter_id, b.value, b.event_date);
    
      -- Сохраняем изменения, очищая временную таблицу
      commit;
    
    end;

end test_package;
/

show errors;

Можно заметить, что запросы получились не простые, но сложные запросы это именно то, что Oracle умеет выполнять лучше всего.

Внесем изменения в код:

Окончательная редакция

package com.acme.ae.tests.jdbc;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class Test {
	
	private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
	private final static String USER_CONN  = "jdbc:oracle:thin:@192.168.124.35:1521:orcl";
	private final static String USER_NAME  = "ais";
	private final static String USER_PASS  = "ais";
	
	private final static boolean AUTO_COMMIT_MODE = false;
	private final static int     BULK_SIZE        = 100;
	
	private final static String  ADD_DATA_SQL     = 
			"insert into test_data(device_id, parameter_id, value) values (?,?,?)";

	private final static String  SAVE_DATA_SQL    = 
			"begin test_package.saveData; end;";
	
	private Connection c = null;
	
	private void test() throws SQLException {
		Long timestamp = System.currentTimeMillis(); 
		CallableStatement st = c.prepareCall(ADD_DATA_SQL);
		try {
			int bulkSize = BULK_SIZE;
			for (int i = 0; i < 1000; i++) {
				st.setInt(1, i);
				st.setInt(2, 1);
				st.setString(3, "0");
				st.addBatch();
				if (--bulkSize <= 0) {
					st.executeBatch();
					bulkSize = BULK_SIZE;
				}
			}
			if (bulkSize < BULK_SIZE) {
				st.executeBatch();
			}
		} finally {
			st.close();
		}
		st = c.prepareCall(SAVE_DATA_SQL);
		try {
			st.execute();
		} finally {
			st.close();
		}
		System.out.println(1000000L / (System.currentTimeMillis() - timestamp));
	}

	private void start() throws ClassNotFoundException, SQLException {
		Class.forName(CLASS_NAME);
		c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
		c.setAutoCommit(AUTO_COMMIT_MODE);
	}
	
	private void stop() throws SQLException  {
		if (c != null) {
			c.close();
		}
	}

	public static void main(String[] args) {
		Test t = new Test();
		try {
			try {
				t.start();
				t.test();
			} finally {
				t.stop();
			}
		} catch (Exception e) {
			System.out.println(e.toString());
		}
	}
}

и запустим его на выполнение:

5319

Увы, чуда не произошло. Необходимость сохранения данных в журналируемых таблицах сыграла свою роль. Но 5000 записей в секунду, в любом случае, гораздо лучше 600. Теперь мы знаем максимальную скорость, с которой мы можем сохранять данные в Oracle (естественно на выбранном для тестов сервере).

Если нам требуется более высокая скорость обработки или меньшее время обработки каждого запроса, имеет смысл смотреть с сторону InMemory DB. Но это тема для другого разговора.

Готовы ли вы потерять свои данные?

Можно заметить, что мы добились высокой скорости загрузки данных в обмен на риск потери той части данных, сохранение которой не зафиксировано. При проектировании приложения, этот риск можно свести к минимуму, но полностью избавиться от него нельзя.

Использование InMemory или NoSQL DB не изменит эту картину радикально. Хотя эти СУБД предусматривают защиту данных (например при помощи REDO-лога в случае Oracle TimesTen), добиться максимальной производительности можно только отключив эту защиту (такая возможность предоставляется очень часто).

Приходится выбирать, что важнее — обеспечить максимально возможную сохранность данных или максимальную производительность? Ответ на этот вопрос может дать только постановщик задачи. Мы же, со своей стороны, можем воплотить его требования в жизнь, добиваясь максимальной эффективности от того инструмента, который используем.

Автор: GlukKazan

Источник

Поделиться

* - обязательные к заполнению поля