Skip to content

Commit 0cf03e7

Browse files
committed
#392 Host.Outbox.PostgreSql
Signed-off-by: Richard Pringle <[email protected]>
1 parent 54e6785 commit 0cf03e7

File tree

68 files changed

+2968
-224
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+2968
-224
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ The configuration can be [modularized](docs/intro.md#modularization-of-configura
167167
| `.Host.AspNetCore` | Integration for ASP.NET Core | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.AspNetCore.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.AspNetCore) |
168168
| `.Host.Interceptor` | Core interface for interceptors | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Interceptor.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Interceptor) |
169169
| `.Host.FluentValidation` | Validation for messages based on [FluentValidation](https://www.nuget.org/packages/FluentValidation) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.FluentValidation.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.FluentValidation) |
170+
| `.Host.Outbox.PostgreSql` | Transactional Outbox using PostgreSQL | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Outbox.PostgreSql.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.PostgreSql) |
171+
| `.Host.Outbox.PostgreSql.DbContext` | Transactional Outbox using PostgreSQL with EF DataContext integration | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Outbox.PostgreSql.DbContext.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.PostgreSql.DbContext) |
170172
| `.Host.Outbox.Sql` | Transactional Outbox using MSSQL | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Outbox.Sql.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql) |
171173
| `.Host.Outbox.Sql.DbContext` | Transactional Outbox using MSSQL with EF DataContext integration | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Outbox.Sql.DbContext.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql.DbContext) |
172174
| `.Host.AsyncApi` | [AsyncAPI](https://www.asyncapi.com/) specification generation via [Saunter](https://github.com/tehmantra/saunter) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.AsyncApi.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.AsyncApi) |

docs/README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@
1313
- [RabbitMQ](provider_rabbitmq.md)
1414
- [Redis](provider_redis.md)
1515
- [SQL](provider_sql.md)
16-
- [Serialization Plugins](serialization.md)
16+
- Plugins
17+
- [Serialization](serialization.md)
18+
- [Transactional Outbox](plugin_outbox.md)
19+
- [Validation using FluentValidation](plugin_fluent_validation.md)
20+
- [AsyncAPI specification generation](plugin_asyncapi.md)
21+
- [Consumer Circuit Breaker](intro.md#health-check-circuit-breaker)
1722
- [Use Cases](UseCases/)
1823
- For Maintainers:
1924
- [Build & Test](Maintainers/build.md)

docs/plugin_outbox.md

Lines changed: 93 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,16 @@ Please read the [Introduction](intro.md) before reading this provider documentat
55
- [Introduction](#introduction)
66
- [Configuration](#configuration)
77
- [Entity Framework](#entity-framework)
8-
- [SQL Connection](#sql-connection)
8+
- [PostgreSQL](#postgresql)
9+
- [SQL Server](#sql-server)
10+
- [Direct Connection](#direct-connection)
11+
- [PostgreSQL](#postgresql-1)
12+
- [SQL Server](#sql-server-1)
913
- [Options](#options)
1014
- [UseOutbox for Producers](#useoutbox-for-producers)
1115
- [Transactions for Consumers](#transactions-for-consumers)
1216
- [UseTransactionScope](#usetransactionscope)
17+
- [UsePostgreSqlTransaction](#usepostgresqltransaction)
1318
- [UseSqlTransaction](#usesqltransaction)
1419
- [How it works](#how-it-works)
1520
- [Clean up](#clean-up)
@@ -18,16 +23,29 @@ Please read the [Introduction](intro.md) before reading this provider documentat
1823
## Introduction
1924

2025
The [`Host.Outbox`](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox) introduces [Transactional Outbox](https://microservices.io/patterns/data/transactional-outbox.html) pattern to the SlimMessageBus.
21-
It comes in two flavors:
2226

27+
PostgreSQL
28+
- [`Host.Outbox.PostgreSql`](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.PostgreSql) as integration with the [Npgsql](https://www.npgsql.org/) client
29+
- [`Host.Outbox.PostgreSql.DbContext`](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.PostgreSql.DbContext) as integration with Entity Framework Core using Npgsql
30+
31+
SQL server
2332
- [`Host.Outbox.Sql`](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql) as integration with the System.Data.Sql client (MSSQL)
24-
- [`Host.Outbox.Sql.DbContext`](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql.DbContext) as integration with Entity Framework Core
33+
- [`Host.Outbox.Sql.DbContext`](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql.DbContext) as integration with Entity Framework Core using System.Data.Sql
2534

2635
Outbox plugin can work in combination with any transport provider.
2736

2837
## Configuration
2938

3039
### Entity Framework
40+
#### PostgreSQL
41+
42+
> Required: [`SlimMessageBus.Host.Outbox.PostgreSql.DbContext`](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.PostgreSql.DbContext)
43+
44+
```cs
45+
using SlimMessageBus.Host.Outbox.PostgreSql.DbContext;
46+
```
47+
48+
#### SQL Server
3149

3250
> Required: [`SlimMessageBus.Host.Outbox.Sql.DbContext`](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql.DbContext)
3351
@@ -40,7 +58,7 @@ Consider the following example (from [Samples](../src/Samples/Sample.OutboxWebAp
4058
- `services.AddOutboxUsingDbContext<CustomerContext>(...)` is used to add the [Outbox.DbContext](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql.DbContext) plugin to the container.
4159
- `CustomerContext` is the application specific Entity Framework `DbContext`.
4260
- `CustomerCreatedEvent` is produced on the `AzureSB` child bus, the bus will deliver these events via outbox - see `.UseOutbox()`
43-
- `CreateCustomerCommand` is consumed on the `Memory` child bus, each command is wrapped in an SQL transaction - see `UseSqlTransaction()`
61+
- `CreateCustomerCommand` is consumed on the `Memory` child bus, each command is wrapped in an SQL transaction - see `UsePostgreSqlTransaction()` / `UseSqlTransaction()`
4462

4563
Startup setup:
4664

@@ -52,9 +70,20 @@ builder.Services.AddSlimMessageBus(mbb =>
5270
.AddChildBus("Memory", mbb =>
5371
{
5472
mbb.WithProviderMemory()
55-
.AutoDeclareFrom(Assembly.GetExecutingAssembly(), consumerTypeFilter: t => t.Name.EndsWith("CommandHandler"))
56-
//.UseTransactionScope(messageTypeFilter: t => t.Name.EndsWith("Command")) // Consumers/Handlers will be wrapped in a TransactionScope
57-
.UseSqlTransaction(messageTypeFilter: t => t.Name.EndsWith("Command")); // Consumers/Handlers will be wrapped in a SqlTransaction ending with Command
73+
.AutoDeclareFrom(Assembly.GetExecutingAssembly(), consumerTypeFilter: t => t.Name.EndsWith("CommandHandler"));
74+
//.UseTransactionScope(messageTypeFilter: t => t.Name.EndsWith("Command")) // Consumers/Handlers will be wrapped in a TransactionScope
75+
//.UseSqlTransaction(messageTypeFilter: t => t.Name.EndsWith("Command")); // Consumers/Handlers will be wrapped in a SqlTransaction ending with Command
76+
77+
switch (dbProvider)
78+
{
79+
case DbProvider.SqlServer:
80+
mbb.UseSqlTransaction(messageTypeFilter: t => t.Name.EndsWith("Command")); // Consumers/Handlers will be wrapped in a SqlTransaction ending with Command
81+
break;
82+
83+
case DbProvider.PostgreSql:
84+
mbb.UsePostgreSqlTransaction(messageTypeFilter: t => t.Name.EndsWith("Command")); // Consumers/Handlers will be wrapped in a SqlTransaction ending with Command
85+
break;
86+
}
5887
})
5988
.AddChildBus("AzureSB", mbb =>
6089
{
@@ -81,20 +110,40 @@ builder.Services.AddSlimMessageBus(mbb =>
81110
// x.UseOutbox();
82111
})
83112
// All outgoing messages from this bus will go out via an outbox
84-
.UseOutbox(/* messageTypeFilter: t => t.Name.EndsWith("Command") */); // Additionaly, can apply filter do determine messages that should go out via outbox
113+
.UseOutbox(/* messageTypeFilter: t => t.Name.EndsWith("Command") */); // Additionally, can apply filter do determine messages that should go out via outbox
85114
})
86115
.AddServicesFromAssembly(Assembly.GetExecutingAssembly())
87116
.AddJsonSerializer()
88-
.AddAspNet()
89-
.AddOutboxUsingDbContext<CustomerContext>(opts =>
90-
{
91-
opts.PollBatchSize = 100;
92-
opts.PollIdleSleep = TimeSpan.FromSeconds(10);
93-
opts.MessageCleanup.Interval = TimeSpan.FromSeconds(10);
94-
opts.MessageCleanup.Age = TimeSpan.FromMinutes(1);
95-
//opts.SqlSettings.TransactionIsolationLevel = System.Data.IsolationLevel.RepeatableRead;
96-
//opts.SqlSettings.Dialect = SqlDialect.SqlServer;
97-
});
117+
.AddAspNet();
118+
119+
switch (dbProvider)
120+
{
121+
case DbProvider.SqlServer:
122+
SlimMessageBus.Host.Outbox.Sql.DbContext.MessageBusBuilderExtensions.AddOutboxUsingDbContext<CustomerContext>(mbb, opts =>
123+
{
124+
opts.PollBatchSize = 500;
125+
opts.PollIdleSleep = TimeSpan.FromSeconds(10);
126+
opts.MessageCleanup.Interval = TimeSpan.FromSeconds(10);
127+
opts.MessageCleanup.Age = TimeSpan.FromMinutes(1);
128+
//opts.SqlSettings.TransactionIsolationLevel = System.Data.IsolationLevel.RepeatableRead;
129+
//opts.SqlSettings.Dialect = SqlDialect.SqlServer;
130+
});
131+
132+
break;
133+
134+
case DbProvider.PostgreSql:
135+
SlimMessageBus.Host.Outbox.PostgreSql.DbContext.MessageBusBuilderExtensions.AddOutboxUsingDbContext<CustomerContext>(mbb, opts =>
136+
{
137+
opts.PollBatchSize = 500;
138+
opts.PollIdleSleep = TimeSpan.FromSeconds(10);
139+
opts.MessageCleanup.Interval = TimeSpan.FromSeconds(10);
140+
opts.MessageCleanup.Age = TimeSpan.FromMinutes(1);
141+
//opts.SqlSettings.TransactionIsolationLevel = System.Data.IsolationLevel.RepeatableRead;
142+
//opts.SqlSettings.Dialect = SqlDialect.SqlServer;
143+
});
144+
145+
break;
146+
}
98147
});
99148
```
100149

@@ -119,7 +168,16 @@ public record CreateCustomerCommandHandler(IMessageBus Bus, CustomerContext Cust
119168
}
120169
```
121170

122-
### SQL Connection
171+
### Direct Connection
172+
#### PostgreSQL
173+
174+
> Required: [`SlimMessageBus.Host.Outbox.PostgreSql`](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.PostgreSql)
175+
176+
```cs
177+
using SlimMessageBus.Host.Outbox.PostgreSql;
178+
```
179+
180+
#### SQL Server
123181

124182
> Required: [`SlimMessageBus.Host.Outbox.Sql`](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql)
125183
@@ -129,7 +187,7 @@ using SlimMessageBus.Host.Outbox.Sql;
129187

130188
Consider the following example:
131189

132-
- `services.AddMessageBusOutboxUsingSql(...)` is used to add the [Outbox.Sql](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql) plugin to the container.
190+
- `services.AddOutboxUsingSql(...)` is used to add the [Outbox.Sql](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql) plugin to the container.
133191
- `SqlConnection` is registered in the container
134192

135193
```cs
@@ -181,6 +239,20 @@ using SlimMessageBus.Host.Outbox;
181239

182240
When applied on the (child) bus level then all consumers (or handlers) will inherit that option.
183241

242+
#### UsePostgreSqlTransaction
243+
244+
> Required: [`SlimMessageBus.Host.Outbox.PostgreSql`](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.PostgreSql) or [`SlimMessageBus.Host.Outbox.PostgreSql.DbContext`](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.PostgreSql.DbContext)
245+
246+
```cs
247+
using SlimMessageBus.Host.Outbox.PostgreSql;
248+
```
249+
250+
`.UsePostgreSqlTransaction()` can be used on consumers (or handlers) declaration to force the consumer to start a `PostgreSqlTransaction` prior the message `OnHandle` and to complete that transaction after it. Any exception raised by the consumer would cause the transaction to be rolled back.
251+
252+
When applied on the (child) bus level then all consumers (or handlers) will inherit that option.
253+
254+
`PostgreSqlTransaction`-s are created off the associated `NpgsqlConnection`.
255+
184256
#### UseSqlTransaction
185257

186258
> Required: [`SlimMessageBus.Host.Outbox.Sql`](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql) or [`SlimMessageBus.Host.Outbox.Sql.DbContext`](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql.DbContext)
@@ -204,7 +276,7 @@ When applied on the (child) bus level then all consumers (or handlers) will inhe
204276
- When a message is sent via a bus or producer marked with `.UseOutbox()` then such message will be inserted into the `Outbox` table.
205277
It is important that message publish happens in the context of an transaction to ensure consistency.
206278

207-
- When the message publication happens in the context of a consumer (or handler) of another message, the `.UseTransactionScope()`, `.UseSqlTransaction()` can be used to start a transaction.
279+
- When the message publication happens in the context of a consumer (or handler) of another message, the `.UseTransactionScope()`, `.UseSqlTransaction()` or `.UseSqlTransaction()` can be used to start a transaction.
208280

209281
- The transaction can be managed by the application, starting it either explicitly using `DbContext.Database.BeginTransactionAsync()` or creating a `TransactionScope()`.
210282

0 commit comments

Comments
 (0)