Skip to content

Транзакции

RAprogramm edited this page Jan 7, 2026 · 2 revisions

Типобезопасные транзакции с несколькими сущностями и автоматическим commit/rollback.

Что такое транзакции?

Транзакция базы данных — это способ объединить несколько операций с базой данных в одну атомарную единицу. Это означает:

  • Всё или ничего: Либо ВСЕ операции успешны, либо НИ ОДНА не применяется
  • Автоматический откат: Если любая операция падает, все предыдущие изменения автоматически отменяются
  • Согласованность данных: База данных никогда не окажется в несогласованном состоянии

Зачем нужны транзакции?

Представьте, что вы разрабатываете банковское приложение и нужно перевести деньги между счетами:

1. Списать 100₽ со Счёта A
2. Зачислить 100₽ на Счёт B

Без транзакций, если шаг 1 успешен, но шаг 2 падает (сетевая ошибка, сбой БД и т.д.), вы только что потеряли 100₽! Деньги списаны с A, но не зачислены на B.

С транзакциями, если шаг 2 падает, шаг 1 автоматически откатывается. Деньги остаются на Счёте A, как будто ничего не произошло.

Включение транзакций

Добавьте атрибут transactions к сущности:

use entity_derive::Entity;
use uuid::Uuid;

#[derive(Entity)]
#[entity(table = "accounts", transactions)]  // ← Добавьте это
pub struct Account {
    #[id]
    pub id: Uuid,

    #[field(create, update, response)]
    pub user_id: Uuid,

    #[field(create, update, response)]
    pub balance: i64,
}

Что генерируется?

Для сущности Account с #[entity(transactions)] макрос генерирует:

1. Адаптер репозитория для транзакций

pub struct AccountTransactionRepo<'t> {
    tx: &'t mut sqlx::Transaction<'static, sqlx::Postgres>,
}

Это как обычный репозиторий, но все операции происходят внутри транзакции.

2. Трейт-расширение для билдера

pub trait TransactionWithAccount<'p> {
    fn with_accounts(self) -> Transaction<'p, PgPool, AccountTransactionRepo<'static>>;
}

Это добавляет метод with_accounts() к билдеру транзакции.

Доступные методы

Внутри транзакции доступны следующие методы:

Метод Сигнатура Описание
create create(dto) -> Result<Entity, Error> Вставить новую запись
find_by_id find_by_id(id) -> Result<Option<Entity>, Error> Найти по первичному ключу
update update(id, dto) -> Result<Entity, Error> Обновить существующую запись
delete delete(id) -> Result<bool, Error> Удалить запись (или мягкое удаление)
list list(limit, offset) -> Result<Vec<Entity>, Error> Список с пагинацией

Простой пример

use entity_core::prelude::*;

async fn create_account(pool: &PgPool, user_id: Uuid) -> Result<Account, AppError> {
    Transaction::new(pool)           // 1. Начать создание транзакции
        .with_accounts()              // 2. Добавить репозиторий Account
        .run(|mut ctx| async move {   // 3. Выполнить операции
            let account = ctx.accounts().create(CreateAccountRequest {
                user_id,
                balance: 0,
            }).await?;

            Ok(account)               // 4. Вернуть результат (автоматический commit)
        })
        .await
}

Пошагово:

  1. Transaction::new(pool) — Создаёт билдер транзакции с пулом соединений
  2. .with_accounts() — Добавляет репозиторий Account в контекст транзакции
  3. .run(|mut ctx| async move { ... }) — Выполняет операции внутри транзакции
  4. Ok(account) — Возврат Ok коммитит транзакцию. Возврат Err откатывает её.

Полный пример: Перевод денег

Этот пример показывает всю мощь транзакций:

use entity_core::prelude::*;
use uuid::Uuid;

#[derive(Debug)]
pub enum TransferError {
    Database(sqlx::Error),
    AccountNotFound(Uuid),
    InsufficientFunds { available: i64, requested: i64 },
}

impl From<sqlx::Error> for TransferError {
    fn from(e: sqlx::Error) -> Self {
        TransferError::Database(e)
    }
}

impl From<TransactionError<sqlx::Error>> for TransferError {
    fn from(e: TransactionError<sqlx::Error>) -> Self {
        TransferError::Database(e.into_inner())
    }
}

/// Атомарный перевод денег между двумя счетами.
///
/// Если ЛЮБОЙ шаг падает, все изменения автоматически откатываются.
pub async fn transfer(
    pool: &PgPool,
    from_id: Uuid,
    to_id: Uuid,
    amount: i64,
) -> Result<(), TransferError> {
    Transaction::new(pool)
        .with_accounts()
        .run(|mut ctx| async move {
            // Шаг 1: Получить исходный счёт
            let from = ctx.accounts()
                .find_by_id(from_id)
                .await?
                .ok_or(TransferError::AccountNotFound(from_id))?;

            // Шаг 2: Проверить достаточно ли денег
            if from.balance < amount {
                return Err(TransferError::InsufficientFunds {
                    available: from.balance,
                    requested: amount,
                });
            }

            // Шаг 3: Получить целевой счёт
            let to = ctx.accounts()
                .find_by_id(to_id)
                .await?
                .ok_or(TransferError::AccountNotFound(to_id))?;

            // Шаг 4: Списать с источника
            // Если это успешно, но шаг 5 падает, это будет ОТКАЧЕНО
            ctx.accounts().update(from_id, UpdateAccountRequest {
                balance: Some(from.balance - amount),
                user_id: None,  // Не меняем user_id
            }).await?;

            // Шаг 5: Зачислить на целевой
            ctx.accounts().update(to_id, UpdateAccountRequest {
                balance: Some(to.balance + amount),
                user_id: None,
            }).await?;

            // Все операции успешны - транзакция ЗАКОММИТИТСЯ
            Ok(())
        })
        .await
}

Что происходит в разных сценариях:

Сценарий Результат
Оба обновления успешны Транзакция коммитится, деньги переведены
Исходный счёт не найден Транзакция откатывается (без изменений)
Недостаточно средств Транзакция откатывается (без изменений)
Первое обновление успешно, второе падает Транзакция откатывается (первое обновление отменено!)
Сетевая ошибка посреди транзакции Транзакция откатывается (без частичных изменений)

Несколько сущностей в одной транзакции

Можно работать с несколькими сущностями атомарно:

#[derive(Entity)]
#[entity(table = "accounts", transactions)]
pub struct Account {
    #[id]
    pub id: Uuid,
    #[field(create, update, response)]
    pub balance: i64,
}

#[derive(Entity)]
#[entity(table = "transfer_logs", transactions)]
pub struct TransferLog {
    #[id]
    pub id: Uuid,
    #[field(create, response)]
    pub from_account_id: Uuid,
    #[field(create, response)]
    pub to_account_id: Uuid,
    #[field(create, response)]
    pub amount: i64,
    #[auto]
    #[field(response)]
    pub created_at: DateTime<Utc>,
}

async fn transfer_with_logging(
    pool: &PgPool,
    from_id: Uuid,
    to_id: Uuid,
    amount: i64,
) -> Result<TransferLog, AppError> {
    Transaction::new(pool)
        .with_accounts()      // Добавить репозиторий Account
        .with_transfer_logs() // Добавить репозиторий TransferLog
        .run(|mut ctx| async move {
            // Обновить балансы
            let from = ctx.accounts().find_by_id(from_id).await?
                .ok_or(AppError::NotFound)?;

            ctx.accounts().update(from_id, UpdateAccountRequest {
                balance: Some(from.balance - amount),
            }).await?;

            let to = ctx.accounts().find_by_id(to_id).await?
                .ok_or(AppError::NotFound)?;

            ctx.accounts().update(to_id, UpdateAccountRequest {
                balance: Some(to.balance + amount),
            }).await?;

            // Создать запись лога - всё в одной транзакции!
            let log = ctx.transfer_logs().create(CreateTransferLogRequest {
                from_account_id: from_id,
                to_account_id: to_id,
                amount,
            }).await?;

            Ok(log)
        })
        .await
}

Если создание лога падает, оба обновления счетов откатываются!

Обработка ошибок

Автоматический откат

Любая ошибка, возвращённая из замыкания, вызывает откат:

Transaction::new(pool)
    .with_accounts()
    .run(|mut ctx| async move {
        ctx.accounts().update(id, dto).await?;  // Успешно

        // Какая-то валидация падает
        if amount < 0 {
            return Err(AppError::InvalidAmount);  // ← Вызывает откат!
        }

        // Это никогда не выполнится, и обновление выше отменится
        ctx.accounts().update(other_id, other_dto).await?;

        Ok(())
    })
    .await

Типы ошибок транзакций

Enum TransactionError сообщает что пошло не так:

use entity_core::transaction::TransactionError;

let result = Transaction::new(pool)
    .with_accounts()
    .run(|mut ctx| async move { /* ... */ })
    .await;

match result {
    Ok(value) => {
        println!("Успех: {:?}", value);
    }
    Err(e) => {
        // Проверить тип ошибки
        if e.is_begin() {
            println!("Не удалось начать транзакцию");
        } else if e.is_operation() {
            println!("Операция упала: {}", e);
        } else if e.is_commit() {
            println!("Не удалось закоммитить");
        } else if e.is_rollback() {
            println!("Не удалось откатить");
        }

        // Получить внутреннюю ошибку БД
        let db_error: sqlx::Error = e.into_inner();
    }
}

С мягким удалением

Транзакции учитывают атрибут soft_delete:

#[derive(Entity)]
#[entity(table = "documents", transactions, soft_delete)]
pub struct Document {
    #[id]
    pub id: Uuid,

    #[field(create, response)]
    pub title: String,

    #[field(skip)]
    pub deleted_at: Option<DateTime<Utc>>,  // Обязательно для soft_delete
}

async fn archive_document(pool: &PgPool, id: Uuid) -> Result<bool, AppError> {
    Transaction::new(pool)
        .with_documents()
        .run(|mut ctx| async move {
            // Это устанавливает deleted_at = NOW() вместо DELETE
            let deleted = ctx.documents().delete(id).await?;
            Ok(deleted)
        })
        .await
}

Лучшие практики

1. Держите транзакции короткими

Плохо: Долгие транзакции

Transaction::new(pool)
    .with_accounts()
    .run(|mut ctx| async move {
        let account = ctx.accounts().find_by_id(id).await?;

        // НЕ ДЕЛАЙТЕ: Вызов внешних API внутри транзакций
        let rate = external_api.get_exchange_rate().await?;  // ← МЕДЛЕННО!

        ctx.accounts().update(id, dto).await?;
        Ok(())
    })
    .await

Хорошо: Медленные операции снаружи

// Получить внешние данные ДО начала транзакции
let rate = external_api.get_exchange_rate().await?;

Transaction::new(pool)
    .with_accounts()
    .run(|mut ctx| async move {
        ctx.accounts().update(id, UpdateAccountRequest {
            balance: Some(calculate_new_balance(rate)),
        }).await?;
        Ok(())
    })
    .await

2. Не используйте транзакции для одиночных операций

Лишнее:

Transaction::new(pool)
    .with_users()
    .run(|mut ctx| async move {
        ctx.users().find_by_id(id).await  // Всего одна операция!
    })
    .await

Лучше: Используйте обычный репозиторий

pool.find_by_id(id).await  // Транзакция не нужна

3. Обрабатывайте все ошибки правильно

Всегда пробрасывайте ошибки с ?:

Transaction::new(pool)
    .with_accounts()
    .run(|mut ctx| async move {
        let result = ctx.accounts().update(id, dto).await;

        // НЕ ДЕЛАЙТЕ: Глотать ошибки
        if let Err(e) = result {
            log::error!("Обновление упало: {}", e);
            // Транзакция не откатится правильно!
        }

        // ДЕЛАЙТЕ: Пробрасывать ошибки
        ctx.accounts().update(id, dto).await?;  // ← Используйте ?

        Ok(())
    })
    .await

Типичные паттерны

Проверить-затем-обновить

Transaction::new(pool)
    .with_products()
    .run(|mut ctx| async move {
        let product = ctx.products().find_by_id(id).await?
            .ok_or(AppError::NotFound)?;

        if product.stock < quantity {
            return Err(AppError::OutOfStock);
        }

        ctx.products().update(id, UpdateProductRequest {
            stock: Some(product.stock - quantity),
            ..Default::default()
        }).await?;

        Ok(product)
    })
    .await

Создать несколько связанных записей

Transaction::new(pool)
    .with_orders()
    .with_order_items()
    .run(|mut ctx| async move {
        // Создать родителя
        let order = ctx.orders().create(CreateOrderRequest {
            customer_id,
            status: "pending".to_string(),
        }).await?;

        // Создать дочерние записи
        for item in items {
            ctx.order_items().create(CreateOrderItemRequest {
                order_id: order.id,
                product_id: item.product_id,
                quantity: item.quantity,
            }).await?;
        }

        Ok(order)
    })
    .await

См. также

  • Атрибуты — Полная документация по атрибутам
  • Хуки — Выполнение кода до/после операций
  • Команды — CQRS паттерн команд
  • События — Отслеживание изменений сущностей

Clone this wiki locally