-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Labels
Description
Título: Implement proper Transactional Outbox - same transaction guarantee
Labels: critical, event-sourcing, outbox-pattern
Prioridade: CRÍTICA
Problema Atual (Ponto 2 do Feedback - continuação):
- Evento Outbox e mudança de estado NÃO estão na mesma transação
- Publicação pode falhar sem trilha do evento
Solução Proposta:
// DataProvider - Transactional Outbox Service
@Service
@Transactional
public class TransactionalOutboxService {
@Transactional
public void publishEventWithStateChange(Runnable stateChange, DomainEvent event) {
// 1. Executar mudança de estado
stateChange.run();
// 2. Salvar evento outbox na MESMA transação
OutboxEvent outboxEvent = OutboxEvent.builder()
.aggregateId(event.getAggregateId())
.eventType(event.getEventType())
.payload(event.getPayload())
.createdAt(LocalDateTime.now())
.processed(false)
.build();
outboxRepository.save(outboxEvent);
// Se qualquer operação falhar, TUDO faz rollback
}
}
// Uso nos Repository Implementations
@Repository
@Transactional
public class PostgreSQLWalletRepositoryImpl implements WalletRepository {
@Transactional
@Override
public WalletTransaction deposit(String userId, Money amount) {
return transactionalOutboxService.publishEventWithStateChange(
() -> {
// Mudança de estado
Wallet wallet = findWallet(userId);
wallet.deposit(amount);
walletJpaRepository.save(wallet);
transactionJpaRepository.save(transaction);
},
// Evento
FundsDepositedEvent.builder()
.walletId(wallet.getId())
.amount(amount)
.build()
);
}
}Tarefas:
-
Criar TransactionalOutboxService
-
Garantir outbox + estado na MESMA transação
-
Implementar OutboxProcessor resiliente
@Component @ConditionalOnProperty("wallet.outbox.scheduler.enabled") public class OutboxProcessor { @Scheduled(fixedDelay = 5000) @Transactional public void processOutboxEvents() { List<OutboxEvent> unprocessed = outboxRepository.findUnprocessedEvents(); for (OutboxEvent event : unprocessed) { try { kafkaTemplate.send(topicName, event.getPayload()).get(); // Sync send event.markAsProcessed(); outboxRepository.save(event); } catch (Exception e) { event.incrementRetryCount(); if (event.getRetryCount() >= maxRetries) { // Send to DLQ deadLetterQueueService.send(event); } } } } }
-
Adicionar retry com exponential backoff
-
Implementar Dead Letter Queue
-
Testes de falha entre persistência e publicação