- PVSM.RU - https://www.pvsm.ru -
Представьте: вы строите систему верификации дипломов. Требования простые - данные должны быть неизменяемыми (привет, блокчейн) и при этом быстро доступными для запросов (привет, PostgreSQL). Казалось бы, идеальное решение - писать в оба хранилища. Но дьявол, как всегда, кроется в деталях.
Наш проект использует паттерн двойной записи (Dual-Write):
Solana — гарантирует неизменность и прозрачность данных о выданных дипломах
PostgreSQL (Supabase) — обеспечивает быстрые выборки и сложные запросы
Звучит красиво на архитектурных диаграммах, но в production всё не так радужно. Главная проблема — частичные сбои. Транзакция в Solana прошла успешно, диплом записан в блокчейн навечно, а вот запись в PostgreSQL упала. Пользователь получил подтверждение, но половина системы о его дипломе не знает.
Сегодня я покажу, как мы столкнулись с этой проблемой лицом к лицу и какие паттерны применили для её решения.
Давайте посмотрим на реальный код из нашего internal/api/handlers.rs [1]. Функция issue_diploma - это то место, где происходит магия... и где всё может пойти не так:
pub async fn issue_diploma(
State(state): State<Arc<AppState>>,
mut multipart: Multipart,
) -> Result<Json<IssueResponse>, AppError> {
// ... парсинг multipart данных ...
// Генерируем хеш диплома
let hash = hashing::generate_hash(
&file_bytes,
&req.issuer_id,
&req.recipient_id,
issued_at,
req.serial.as_deref(),
);
// Подписываем хеш приватным ключом
let signature = hashing::sign_hash(&hash, &state.issuer_keypair)?;
let diploma = Diploma {
hash: hash.clone(),
issuer_id: req.issuer_id.clone(),
recipient_id: req.recipient_id.clone(),
signature: Some(signature.clone()),
issued_at,
serial: req.serial.clone(),
ipfs_cid: None,
};
// КРИТИЧЕСКАЯ ТОЧКА №1: Запись в блокчейн Solana
// Эта операция может занять 1-3 секунды и стоит денег (газ)
let chain_record = state.chain_client.write_hash(&hash, &diploma).await?;
// Подготавливаем данные для базы
let credential_data = serde_json::json!({
"hash": &diploma.hash,
"issuer_id": &diploma.issuer_id,
"recipient_id": &diploma.recipient_id,
"solana_tx_id": &chain_record.tx_id,
"issued_at": diploma.issued_at.to_rfc3339(),
});
// КРИТИЧЕСКАЯ ТОЧКА №2: Запись в PostgreSQL
// А вот здесь может произойти катастрофа
let db_response = state
.db_client
.from("credentials")
.insert(credential_data.to_string())
.execute()
.await;
// Обработка ошибки базы данных
let db_response = match db_response {
Ok(response) => response,
Err(e) => {
tracing::error!("Database request failed: {}", e);
return Err(AppError::Database(format!("Database request failed: {}", e)));
}
};
if !db_response.status().is_success() {
let status = db_response.status();
let error_body = db_response.text().await.unwrap_or_default();
// ВОТ ОНА - КРИТИЧЕСКАЯ НЕСОГЛАСОВАННОСТЬ!
tracing::error!(
"CRITICAL INCONSISTENCY: Failed to save to Supabase after successful Solana transaction.
tx_id: {}, hash: {}. Status: {}. Body: {}",
chain_record.tx_id,
hash,
status,
error_body
);
// Мы уже записали в блокчейн, откатить нельзя!
return Err(AppError::Internal(
"Failed to save credential record after blockchain confirmation.".to_string(),
));
}
// Если всё хорошо - возвращаем успешный ответ
Ok(Json(IssueResponse {
hash,
tx_id: chain_record.tx_id,
signature: Some(signature),
issued_at,
}))
}
Вот как выглядит последовательность операций и точка отказа:
┌──────────┐ ┌────────────┐ ┌─────────┐ ┌────────────┐
│ Клиент │────▶│ Rust API │───▶│ Solana │────▶│ УСПЕХ │
└──────────┘ └────────────┘ └─────────┘ └────────────┘
│ │
│ ▼
│ ┌────────────┐
└───────────────────────────▶│ PostgreSQL │
└────────────┘
│
▼
┌────────────┐
│ СБОЙ! │
└────────────┘
│
▼
┌──────────────────────┐
│ РАССИНХРОН: │
│ • Блокчейн: ✓ есть |
│ • База: ✗ нет |
└──────────────────────┘
Что происходит после такого сбоя? У нас есть несколько неприятных сценариев:
Пользователь не может найти свой диплом через API запросы к базе данных
Невозможность построить аналитику — данные в базе неполные
Проблемы с аудитом — в блокчейне есть запись, в отчётах её нет
Дублирование при повторной попытке — пользователь может попробовать выпустить диплом ещё раз
В распределённых системах есть два основных подхода к согласованности:
1. Строгая согласованность (Strong Consistency) - все узлы видят одинаковые данные в один момент времени. Это дорого и сложно, особенно когда один из узлов - публичный блокчейн.
2. Итоговая согласованность (Eventual Consistency) - данные могут временно различаться, но в конечном итоге придут к согласованному состоянию.
Мы выбрали итоговую согласованность. Почему? Откатить транзакцию в Solana после подтверждения - невозможно. Значит, нужно гарантировать, что PostgreSQL рано или поздно получит эти данные.
Паттерн Saga разбивает длинную распределённую транзакцию на последовательность локальных транзакций. Каждый шаг может иметь компенсирующую транзакцию для отката.
Как это могло бы выглядеть в нашем случае:
// Псевдокод Saga для выпуска диплома
enum SagaStep {
SaveToDatabase, // Шаг 1
WriteToBlockchain, // Шаг 2
UpdateDatabaseStatus // Шаг 3
}
async fn issue_diploma_saga(diploma: Diploma) -> Result<(), SagaError> {
// Шаг 1: Сохраняем в БД со статусом "pending"
let db_record = match save_to_database_with_status(&diploma, "pending").await {
Ok(record) => record,
Err(e) => {
// Ничего откатывать не нужно, просто выходим
return Err(SagaError::DatabaseFailed(e));
}
};
// Шаг 2: Пишем в блокчейн
let tx_id = match write_to_blockchain(&diploma).await {
Ok(tx) => tx,
Err(e) => {
// Компенсирующая транзакция: помечаем запись как неудачную
mark_database_record_failed(&db_record.id).await?;
return Err(SagaError::BlockchainFailed(e));
}
};
// Шаг 3: Обновляем статус в БД на "confirmed"
match update_database_status(&db_record.id, "confirmed", &tx_id).await {
Ok(_) => Ok(()),
Err(e) => {
// Здесь компенсация сложная: в блокчейне уже есть запись
// Можно только пометить в БД для ручного вмешательства
mark_for_manual_reconciliation(&db_record.id, &tx_id).await?;
Err(SagaError::InconsistentState(e))
}
}
}
Проблема с Saga в блокчейне: Компенсирующие транзакции в Solana стоят денег (газ) и не отменяют предыдущие записи, а добавляют новые. Это делает паттерн дорогим и сложным.
Идемпотентность - это свойство операции давать один и тот же результат при повторных вызовах. В нашем контексте это критически важно.
Вот как мы могли бы добавить механизм повторных попыток:
use tokio::time::{sleep, Duration};
async fn write_to_database_with_retry(
db_client: &Postgrest,
data: serde_json::Value,
max_retries: u32,
) -> Result<(), AppError> {
let mut retries = 0;
let mut backoff = Duration::from_millis(100);
loop {
match db_client
.from("credentials")
.insert(data.to_string())
.execute()
.await
{
Ok(response) if response.status().is_success() => {
return Ok(());
}
Ok(response) if response.status() == 409 => {
// Конфликт - запись уже существует (идемпотентность!)
tracing::info!("Record already exists, considering it success");
return Ok(());
}
Ok(_) | Err(_) if retries < max_retries => {
retries += 1;
tracing::warn!(
"Database write failed, retry {}/{} after {:?}",
retries, max_retries, backoff
);
sleep(backoff).await;
backoff *= 2; // Экспоненциальная задержка
}
_ => {
return Err(AppError::Database(
"Failed after maximum retries".to_string()
));
}
}
}
}
Недостаток: Если база недоступна долго (например, плановое обслуживание), пользователь будет ждать. А блокчейн-транзакция уже выполнена!
После анализа различных подходов, мы остановились на комбинации двух паттернов:
Суть паттерна Outbox - вместо записи напрямую в две системы, мы делаем одну атомарную транзакцию в первичное хранилище, включая событие в таблицу outbox.
Вот как изменится наша архитектура:
// Новая структура для outbox
#[derive(Serialize, Deserialize)]
struct OutboxEvent {
id: Uuid,
event_type: String,
payload: serde_json::Value,
status: String, // "pending", "processing", "completed", "failed"
created_at: DateTime<Utc>,
processed_at: Option<DateTime<Utc>>,
retry_count: u32,
error_message: Option<String>,
}
// Изменённая функция issue_diploma
pub async fn issue_diploma_with_outbox(
State(state): State<Arc<AppState>>,
mut multipart: Multipart,
) -> Result<Json<IssueResponse>, AppError> {
// ... парсинг и генерация хеша как раньше ...
// ВАЖНО: Сначала пишем в БД в одной транзакции
let mut transaction = state.db_client.begin_transaction().await?;
// Сохраняем диплом со статусом "pending_blockchain"
let credential_data = serde_json::json!({
"hash": &diploma.hash,
"issuer_id": &diploma.issuer_id,
"recipient_id": &diploma.recipient_id,
"status": "pending_blockchain",
"issued_at": diploma.issued_at.to_rfc3339(),
});
transaction
.from("credentials")
.insert(credential_data.to_string())
.execute()
.await?;
// Добавляем событие в outbox
let outbox_event = serde_json::json!({
"id": Uuid::new_v4(),
"event_type": "WRITE_TO_BLOCKCHAIN",
"payload": serde_json::to_value(&diploma)?,
"status": "pending",
"created_at": Utc::now(),
"retry_count": 0,
});
transaction
.from("outbox_events")
.insert(outbox_event.to_string())
.execute()
.await?;
// Коммитим транзакцию - либо всё, либо ничего!
transaction.commit().await?;
// Возвращаем ответ пользователю
Ok(Json(IssueResponse {
hash: diploma.hash,
tx_id: "pending".to_string(), // Будет обновлён асинхронно
signature: Some(signature),
issued_at: diploma.issued_at,
}))
}
Теперь нужен фоновый процесс для обработки событий из outbox:
// Фоновый воркер для обработки outbox
async fn outbox_processor(state: Arc<AppState>) {
loop {
// Получаем необработанные события
let events = fetch_pending_outbox_events(&state.db_client).await;
for event in events {
match event.event_type.as_str() {
"WRITE_TO_BLOCKCHAIN" => {
process_blockchain_write(event, &state).await;
}
_ => {
tracing::warn!("Unknown event type: {}", event.event_type);
}
}
}
// Спим перед следующей итерацией
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
async fn process_blockchain_write(
event: OutboxEvent,
state: &Arc<AppState>
) {
let diploma: Diploma = serde_json::from_value(event.payload.clone())
.expect("Failed to deserialize diploma");
// Пытаемся записать в блокчейн
match state.chain_client.write_hash(&diploma.hash, &diploma).await {
Ok(chain_record) => {
// Успех! Обновляем статусы
let mut transaction = state.db_client.begin_transaction().await.unwrap();
// Обновляем credential
transaction
.from("credentials")
.update(serde_json::json!({
"status": "confirmed",
"solana_tx_id": chain_record.tx_id,
}).to_string())
.eq("hash", &diploma.hash)
.execute()
.await
.unwrap();
// Помечаем событие как обработанное
transaction
.from("outbox_events")
.update(serde_json::json!({
"status": "completed",
"processed_at": Utc::now(),
}).to_string())
.eq("id", event.id.to_string())
.execute()
.await
.unwrap();
transaction.commit().await.unwrap();
}
Err(e) => {
// Ошибка - увеличиваем счётчик попыток
update_outbox_event_retry(&state.db_client, event.id, e.to_string()).await;
}
}
}
Даже с Outbox паттерном что-то может пойти не так. Поэтому мы добавили фоновый процесс сверки:
// Периодическая сверка данных между Solana и PostgreSQL
async fn reconciliation_job(state: Arc<AppState>) {
loop {
tracing::info!("Starting reconciliation check...");
// Получаем последние транзакции из Solana за последний час
let cutoff_time = Utc::now() - Duration::from_secs(3600);
let blockchain_records = fetch_recent_blockchain_transactions(
&state.chain_client,
cutoff_time
).await;
for record in blockchain_records {
// Проверяем, есть ли запись в PostgreSQL
let db_result = state
.db_client
.from("credentials")
.select("hash")
.eq("hash", &record.hash)
.single()
.execute()
.await;
if db_result.is_err() || !db_result.unwrap().status().is_success() {
// Запись отсутствует в БД - добавляем
tracing::warn!(
"Found orphaned blockchain record: hash={}, tx_id={}",
record.hash,
record.tx_id
);
// Восстанавливаем запись из блокчейна
let recovery_data = serde_json::json!({
"hash": record.hash,
"solana_tx_id": record.tx_id,
"status": "recovered_from_blockchain",
"recovered_at": Utc::now(),
// Остальные поля берём из метаданных транзакции
});
match state
.db_client
.from("credentials")
.insert(recovery_data.to_string())
.execute()
.await
{
Ok(_) => {
tracing::info!("Successfully recovered record: {}", record.hash);
// Отправляем алерт команде
send_alert(
"Data inconsistency detected and fixed",
&format!("Recovered hash {} from blockchain", record.hash)
).await;
}
Err(e) => {
tracing::error!("Failed to recover record: {}", e);
}
}
}
}
// Запускаем сверку каждые 5 минут
tokio::time::sleep(Duration::from_secs(300)).await;
}
}
Визуализация нового подхода:
┌──────────┐ ┌────────────┐ ┌─────────────┐
│ Клиент │────▶│ Rust API │────▶│ PostgreSQL │
└──────────┘ └────────────┘ │ + Outbox │
└─────────────┘
│
▼
┌─────────────┐
│ УСПЕХ │
│ (Атомарно) │
└─────────────┘
│
┌──────────────────┼──────────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌──────────────┐ ┌──────────────┐
│ Outbox Processor│ │Reconciliation│ │ Monitoring │
│ (Async) │ │ Job │ │ & Alerts │
└─────────────────┘ └──────────────┘ └──────────────┘
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ Solana │◀─────│ Проверка │
└──────────┘ └──────────┘
Вместо простого outbox в БД, можно использовать полноценную очередь сообщений:
// Интеграция с Redis Streams для более надёжной доставки
use redis::AsyncCommands;
async fn publish_to_queue(
redis_client: &redis::Client,
diploma: &Diploma,
) -> Result<(), AppError> {
let mut conn = redis_client.get_async_connection().await?;
let event = serde_json::json!({
"type": "WRITE_TO_BLOCKCHAIN",
"payload": diploma,
"timestamp": Utc::now().to_rfc3339(),
"retry_count": 0,
});
// Добавляем в Redis Stream с автогенерацией ID
conn.xadd(
"diploma:outbox",
"*",
&[("event", serde_json::to_string(&event)?)],
).await?;
Ok(())
}
// Консьюмер с группой для гарантированной доставки
async fn consume_from_queue(redis_client: &redis::Client, state: Arc<AppState>) {
let mut conn = redis_client.get_async_connection().await.unwrap();
// Создаём consumer группу
let _: Result<(), _> = conn.xgroup_create_mkstream(
"diploma:outbox",
"blockchain_writers",
"$",
).await;
loop {
// Читаем события из очереди
let events: Vec<StreamReadReply> = conn.xreadgroup(
&["diploma:outbox"],
"blockchain_writers",
"worker_1",
&[">"],
Some(1),
None,
).await.unwrap();
for event in events {
// Обрабатываем и подтверждаем
process_event(event, &state).await;
conn.xack("diploma:outbox", "blockchain_writers", &[event.id]).await.unwrap();
}
}
}
Критически важно отслеживать состояние системы:
use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec};
lazy_static! {
static ref INCONSISTENCY_COUNTER: CounterVec = register_counter_vec!(
"diploma_inconsistencies_total",
"Total number of data inconsistencies detected",
&["type"]
).unwrap();
static ref RECONCILIATION_DURATION: HistogramVec = register_histogram_vec!(
"reconciliation_duration_seconds",
"Time taken to reconcile records",
&["status"]
).unwrap();
}
// Используем метрики в коде
async fn monitor_inconsistency(inconsistency_type: &str) {
INCONSISTENCY_COUNTER
.with_label_values(&[inconsistency_type])
.inc();
// Алерт если слишком много несогласованностей
let total = INCONSISTENCY_COUNTER
.with_label_values(&[inconsistency_type])
.get();
if total > 10.0 {
send_critical_alert(
"High inconsistency rate detected",
&format!("Type: {}, Count: {}", inconsistency_type, total)
).await;
}
}
Можно пойти ещё дальше и хранить все события как неизменяемый лог:
#[derive(Serialize, Deserialize)]
enum DiplomaEvent {
Created {
hash: String,
issuer_id: String,
recipient_id: String,
timestamp: DateTime<Utc>,
},
BlockchainWriteRequested {
hash: String,
timestamp: DateTime<Utc>,
},
BlockchainWriteCompleted {
hash: String,
tx_id: String,
timestamp: DateTime<Utc>,
},
BlockchainWriteFailed {
hash: String,
error: String,
retry_count: u32,
timestamp: DateTime<Utc>,
},
ReconciliationDetected {
hash: String,
source: String, // "blockchain" или "database"
timestamp: DateTime<Utc>,
},
}
// Это даёт нам полную историю каждого диплома
async fn append_event(
db_client: &Postgrest,
event: DiplomaEvent,
) -> Result<(), AppError> {
let event_data = serde_json::json!({
"event_type": event.variant_name(),
"payload": serde_json::to_value(&event)?,
"timestamp": Utc::now(),
});
db_client
.from("diploma_events")
.insert(event_data.to_string())
.execute()
.await?;
Ok(())
}
Работая с паттерном двойной записи между Solana и PostgreSQL, мы извлекли несколько важных уроков:
Никогда не полагайтесь на последовательные вызовы — если первый прошёл успешно, это не гарантирует успех второго. Особенно когда первый — это необратимая операция в блокчейне.
Проектируйте с учётом сбоев — вопрос не в том, упадёт ли система, а в том, когда это произойдёт. Паттерн Outbox и фоновая сверка — это не избыточность, а необходимость.
Итоговая согласованность — ваш друг, не пытайтесь достичь строгой согласованности между блокчейном и традиционной БД. Это дорого, сложно и часто невозможно.
Мониторинг критичен — лучше получить алерт о рассинхроне через минуту, чем узнать о проблеме от пользователя через неделю.
Идемпотентность спасает — проектируйте операции так, чтобы их можно было безопасно повторять. Это упрощает восстановление после сбоев.
Для коллег, работающих с Web3-бэкендами на Rust: блокчейн — это не серебряная пуля. Это мощный инструмент, но он требует тщательного проектирования всей системы. Двойная запись кажется простым решением, пока вы не столкнётесь с первым production‑сбоем в 3 часа ночи.
Помните: в распределённых системах всё, что может пойти не так, обязательно пойдёт не так. Проектируйте соответственно.
Telegram [7] — автор
Если у вас есть опыт решения подобных проблем или вопросы по реализации - давайте обсудим в комментариях. Особенно интересно услышать про альтернативные подходы к синхронизации блокчейна с традиционными БД.
Автор: Berektassuly
Источник [8]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/postgresql/436514
Ссылки в тексте:
[1] handlers.rs: http://handlers.rs
[2] Saga Pattern — Microservices.io: https://microservices.io/patterns/data/saga.html
[3] Transactional Outbox Pattern: https://microservices.io/patterns/data/transactional-outbox.html
[4] Event Sourcing in Rust: https://doc.rust-lang.org/book/
[5] Building on Solana with Rust: https://solana.com/ru/docs/programs/rust
[6] PostgreSQL и распределённые системы: https://www.postgresql.org/docs/current/warm-standby.html
[7] Telegram: https://t.me/Berektassuly
[8] Источник: https://habr.com/ru/articles/966194/?utm_source=habrahabr&utm_medium=rss&utm_campaign=966194
Нажмите здесь для печати.