- PVSM.RU - https://www.pvsm.ru -
Допустим есть 2 таблицы в любой реляционной базе данных.
Таблица users весом 4TB
|
id |
firstname |
lastname |
|
1 |
Egor |
Myasnik |
|
2 |
Pavel |
Hvastun |
|
3 |
Mitya |
Volk |
Таблица domains 2TB
|
id |
user_id |
domain_name |
|
1 |
1 |
Approval |
|
2 |
1 |
Rejection |
|
3 |
1 |
Stoppage |
|
4 |
3 |
Cancellation |
В один летний день к вам приходит бизнес и требует результат выполнения запроса в реальном времени.
SELECT d.user_id, u.firstname, u.lastname, d.domain_name
FROM users u
INNER JOIN domains d
ON u.user_id = d.user_id
Первое, что придет на ум - это просто создать обычный view с содержимым запроса и попробовать отдать на проверку, но это не будет работать быстро и будет нагружать систему источника ( далее СИ ) данных.
Что обычно делают и мне довелось увидеть в разных компаниях - это приземление CDC данных в кафку и последующей дедупликацией на основе служебных метаданных CDC сообщения в различной системе приемника ( далее СП ) на уровне DWH или витрином слое.
Например делаем CDC потоки в кафку для таблиц users domains и далее какой либо стриминг джобой вычитываем CDC поток из кафки и дедуплицируя кладем в СП в две разные таблицы для users и domains. Далее создаем view с запросом бизнеса и отдаем на проверку. Работать по скорости будет примерно так же, как и в 1 случае с небольшой погрешностью в зависимости от выбранного СП, однако таким образом удалось избавиться от нагрузки СИ. Из минусов - усложнили систему добавив еще 2 слоя.
Для достижения реального времени выполнения запроса с уменьшенной нагрузкой нам также нужен будет слой кафки с CDC сообщениями и витриный слой, но в данной статье я опущу выбор и стратегию подхода на слое СП. У нас также возникнет слой обработки данных с внедрением Apache Flink, в качестве sink я применю для простоты вывод в консоль.
Создайте maven проект с Java 11 и добавьте туда следующие Flink зависимости:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.2.0-1.19</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-client</artifactId>
<version>1.19</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-client</artifactId>
<version>1.19</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
Вам также понадобится настроенный паттерн CDC, а именно CDC сообщения для наших двух таблиц в кафке.
Далее с помощью Table API или DataStream API подпишитесь на кафка топики.
Пример метода.
public class User implements Serializable {
public Integer id;
public String firstname;
public String lastname;
// getters and setters omitted
public static User fromRow(Row row) {
// создайте маппер
}
}
public class Domain implements Serializable {
public Integer id;
public Integer user_id;
public String domain_name;
// getters and setters omitted
public static Domain fromRow(Row row) {
// создайте маппер
}
}
tableEnv.executeSql("CREATE TABLE users (" +
"`before` ROW<id: INT, firstname: STRING, lastname: STRING>," +
"`op` STRING," +
"`after` ROW<id: INT, firstname: STRING, lastname: STRING>," +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'users_topic'," +
"'properties.bootstrap.servers' = 'kafka-brokers'," + // адреса брокеров кафки
"'properties.group.id' = 'users_consumer_group'," +
"'scan.startup.mode' = 'earliest'");
DataStream<User> users = tableEnv.toDataStream(tableEnv.from("users")).map(User::fromRow);
tableEnv.executeSql("CREATE TABLE domains (" +
"`before` ROW<id: INTEGER, user_id: INTEGER, domain_name: STRING>," +
"`op` STRING," +
"`after` ROW<id: INTEGER, user_id: INTEGER, domain_name: STRING>," +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'domains_topic'," +
"'properties.bootstrap.servers' = 'kafka-brokers'," + // адреса брокеров кафки
"'properties.group.id' = 'domains_consumer_group'," +
"'scan.startup.mode' = 'earliest'");
DataStream<Domain> domains = tableEnv.toDataStream(tableEnv.from("domains")).map(Domain::fromRow);
После того, как убедились, что DataStream users и domains получают данные, для реализации `INNER JOIN` операции на уровне SQL нам понадобится метод connect .
users
.connect(domains)
.keyBy(
user -> user.id,
domain -> domain.user_id
)
.process(new InnerJoinFunction())
.print();
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.*;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import java.io.Serializable;
public class Join1 extends KeyedCoProcessFunction<Integer, User, Domain, InnerJoinFunction.Output> {
private MapState<Integer, User> usersState;
private ValueState<Domain> domainsState;
@Override
public void processElement1(final User user, final Context ctx, final Collector<InnerJoinFunction.Output> out) throws Exception {
usersState.put(user.id, user);
final Domain domain = domainsState.value();
if (domain != null) {
out.collect(new InnerJoinFunction.Output(
user.id,
user.firstname,
user.lastname,
domain.domain_name
));
}
}
@Override
public void processElement2(final Domain domain, final Context ctx, final Collector<InnerJoinFunction.Output> out) throws Exception {
domainsState.update(domain);
final boolean innerJoinCondition = !user.metadata.__is_deleted;
for (User user : usersState.values()) {
out.collect(new InnerJoinFunction.Output(
user.id,
user.firstname,
user.lastname,
domain.domain_name
));
}
}
@Override
public void open(OpenContext openContext) throws Exception {
var usersStateDescriptor = new MapStateDescriptor<>(
"users",
Integer.class,
User.class
);
var domainsStateDescriptor = new ValueStateDescriptor<>(
"domains",
Domain.class
);
usersState = getRuntimeContext().getMapState(usersStateDescriptor);
domainsState = getRuntimeContext().getState(domainsStateDescriptor);
super.open(openContext);
}
public static class Output implements Serializable {
public Integer user_id;
public String firstname;
public String lastname;
public String domain_name;
// getters and setters omitted
}
}
MapState используется так как имеем реляцию один ко многим и достижения скорости обновления и доступа к данным в state
На выходе мы будем видеть обновляемые в реальном времени в консоли актуальные данные формирующие данный запрос.
Разумеется, данный пример простой и имеет нюансы в деталях реализации, но возможность реализации наглядно видна и все выполняемые операции выполняются за O(1) в процессе обновления state и отправки данных дальше по потоку. Если предположить, что СП справляется со всеми CRUD операциями не более относительно быстро, то результат запроса будет актуален всегда и выполняться на аналитическом хранилище будет быстро.
Для доработки данного примера разверните OLAP СП и грузите данные каким либо sink коннектором Flink. Вы будете видеть актуальную витрину даже когда данных очень много при правильной настройке и выдаче достаточных ресурсов на уровне СП даже при терабайтных нагрузках на СИ - данная теория была проверена мною на практике.
Учитывайте при обработке данных в InnerJoinFunction удаления и апдейты и обрабатывайте их правильно.
Перекосы тоже бывают и с ними бороться будет непросто.
В прод конфигурации подумайте о внедрении RocksDB.
Автор: Akhtem94
Источник [1]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/java/419022
Ссылки в тексте:
[1] Источник: https://habr.com/ru/articles/907664/?utm_source=habrahabr&utm_medium=rss&utm_campaign=907664
Нажмите здесь для печати.