44using Confluent . Kafka ;
55using Microsoft . Extensions . Logging ;
66using Oteldemo ;
7+ using Microsoft . EntityFrameworkCore ;
8+ using System . Diagnostics ;
79
810namespace Accounting ;
911
12+ internal class DBContext : DbContext
13+ {
14+ public DbSet < OrderEntity > Orders { get ; set ; }
15+ public DbSet < OrderItemEntity > CartItems { get ; set ; }
16+ public DbSet < ShippingEntity > Shipping { get ; set ; }
17+
18+ protected override void OnConfiguring ( DbContextOptionsBuilder optionsBuilder )
19+ {
20+ var connectionString = Environment . GetEnvironmentVariable ( "DB_CONNECTION_STRING" ) ;
21+
22+ optionsBuilder . UseNpgsql ( connectionString ) . UseSnakeCaseNamingConvention ( ) ;
23+ }
24+ }
25+
26+
1027internal class Consumer : IDisposable
1128{
1229 private const string TopicName = "orders" ;
1330
1431 private ILogger _logger ;
1532 private IConsumer < string , byte [ ] > _consumer ;
1633 private bool _isListening ;
34+ private DBContext ? _dbContext ;
35+ private static readonly ActivitySource MyActivitySource = new ( "Accounting.Consumer" ) ;
1736
1837 public Consumer ( ILogger < Consumer > logger )
1938 {
@@ -26,6 +45,7 @@ public Consumer(ILogger<Consumer> logger)
2645 _consumer . Subscribe ( TopicName ) ;
2746
2847 _logger . LogInformation ( $ "Connecting to Kafka: { servers } ") ;
48+ _dbContext = Environment . GetEnvironmentVariable ( "DB_CONNECTION_STRING" ) == null ? null : new DBContext ( ) ;
2949 }
3050
3151 public void StartListening ( )
@@ -38,8 +58,8 @@ public void StartListening()
3858 {
3959 try
4060 {
61+ using var activity = MyActivitySource . StartActivity ( "order-consumed" , ActivityKind . Internal ) ;
4162 var consumeResult = _consumer . Consume ( ) ;
42-
4363 ProcessMessage ( consumeResult . Message ) ;
4464 }
4565 catch ( ConsumeException e )
@@ -61,8 +81,48 @@ private void ProcessMessage(Message<string, byte[]> message)
6181 try
6282 {
6383 var order = OrderResult . Parser . ParseFrom ( message . Value ) ;
64-
6584 Log . OrderReceivedMessage ( _logger , order ) ;
85+
86+ if ( _dbContext == null )
87+ {
88+ return ;
89+ }
90+
91+ var orderEntity = new OrderEntity
92+ {
93+ Id = order . OrderId
94+ } ;
95+ _dbContext . Add ( orderEntity ) ;
96+ foreach ( var item in order . Items )
97+ {
98+ var orderItem = new OrderItemEntity
99+ {
100+ ItemCostCurrencyCode = item . Cost . CurrencyCode ,
101+ ItemCostUnits = item . Cost . Units ,
102+ ItemCostNanos = item . Cost . Nanos ,
103+ ProductId = item . Item . ProductId ,
104+ Quantity = item . Item . Quantity ,
105+ OrderId = order . OrderId
106+ } ;
107+
108+ _dbContext . Add ( orderItem ) ;
109+ }
110+
111+ var shipping = new ShippingEntity
112+ {
113+ ShippingTrackingId = order . ShippingTrackingId ,
114+ ShippingCostCurrencyCode = order . ShippingCost . CurrencyCode ,
115+ ShippingCostUnits = order . ShippingCost . Units ,
116+ ShippingCostNanos = order . ShippingCost . Nanos ,
117+ StreetAddress = order . ShippingAddress . StreetAddress ,
118+ City = order . ShippingAddress . City ,
119+ State = order . ShippingAddress . State ,
120+ Country = order . ShippingAddress . Country ,
121+ ZipCode = order . ShippingAddress . ZipCode ,
122+ OrderId = order . OrderId
123+ } ;
124+ _dbContext . Add ( shipping ) ;
125+ _dbContext . SaveChanges ( ) ;
66126 }
67127 catch ( Exception ex )
68128 {
0 commit comments