Skip to content

Commit aacd59e

Browse files
committed
fixing checkout consumer issue: still some issues
1 parent d848ec6 commit aacd59e

File tree

4 files changed

+18
-25
lines changed

4 files changed

+18
-25
lines changed

src/backend/services/checkout-api/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ async function main() {
3333

3434
process.on('SIGINT', async () => {
3535
await otel.shutdown();
36+
await checkoutPublisher.shutdown();
3637
logger.info("Gracefully shutting down from SIGINT (Ctrl-C)");
3738
// some other closing procedures go here
3839
process.exit(0);

src/backend/services/checkout-api/src/messagging/publisher.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export class Publisher {
1111

1212
public async start(): Promise<void> {
1313
try {
14+
1415
await this.producer.connect()
1516
} catch (error) {
1617
logger.error('Error connecting the producer: ', error)
@@ -32,6 +33,10 @@ export class Publisher {
3233
const kafka = new Kafka({
3334
clientId: this.clientId,
3435
brokers: this.brokers,
36+
retry: {
37+
initialRetryTime: 3000,
38+
retries: 15
39+
},
3540
})
3641
return kafka.producer()
3742
}

src/backend/services/order-api/src/main/java/org/jurabek/restaurant/order/api/events/UserCheckoutEventHandler.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,15 @@
22

33
import jakarta.enterprise.context.ApplicationScoped;
44
import jakarta.inject.Inject;
5-
6-
import io.smallrye.common.annotation.Blocking;
7-
8-
import java.util.concurrent.CompletionStage;
5+
import jakarta.transaction.Transactional;
96

107
import org.eclipse.microprofile.faulttolerance.Retry;
11-
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
128
import org.eclipse.microprofile.reactive.messaging.Incoming;
13-
import org.eclipse.microprofile.reactive.messaging.Message;
149
import org.jboss.logging.Logger;
1510
import org.jurabek.restaurant.order.api.services.CheckoutService;
1611

12+
import io.smallrye.common.annotation.Blocking;
13+
1714
@ApplicationScoped
1815
public class UserCheckoutEventHandler {
1916

@@ -27,17 +24,11 @@ public UserCheckoutEventHandler(CheckoutService checkout) {
2724
}
2825

2926
@Incoming("checkout")
30-
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
31-
@Retry(delay = 10, maxRetries = 5)
27+
@Retry(maxRetries = 5)
28+
@Transactional
3229
@Blocking
33-
public CompletionStage<Void> Handle(Message<UserCheckoutEvent> message) {
34-
try {
35-
log.info("received user checkout event: " + message);
36-
checkout.Checkout(message.getPayload());
37-
return message.ack();
38-
} catch (Exception e) {
39-
log.error("Error processing user checkout event: " + message, e);
40-
return message.nack(e);
41-
}
30+
public void Handle(UserCheckoutEvent message) {
31+
log.info("received user checkout event: " + message);
32+
checkout.Checkout(message);
4233
}
4334
}
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package org.jurabek.restaurant.order.api.services;
22

33
import java.util.UUID;
4+
import java.util.concurrent.CompletionStage;
45

56
import jakarta.enterprise.context.ApplicationScoped;
67
import jakarta.inject.Inject;
7-
import jakarta.transaction.Transactional;
88

99
import org.eclipse.microprofile.reactive.messaging.Channel;
1010
import org.eclipse.microprofile.reactive.messaging.Emitter;
@@ -13,10 +13,9 @@
1313
import org.jurabek.restaurant.order.api.mappers.OrdersMapper;
1414
import org.jurabek.restaurant.order.api.repositories.OrdersRepository;
1515

16-
1716
@ApplicationScoped
1817
public class CheckoutService {
19-
18+
2019
private final OrdersRepository ordersRepository;
2120
private final OrdersMapper mapper;
2221

@@ -30,22 +29,19 @@ public CheckoutService(OrdersRepository ordersRepository, OrdersMapper mapper) {
3029
this.mapper = mapper;
3130
}
3231

33-
@Transactional
3432
public void Checkout(UserCheckoutEvent checkoutInfo) {
3533
var order = mapper.mapDtoToOrder(checkoutInfo.getCustomerBasket());
3634
order.setTransactionId(checkoutInfo.getTransactionId());
3735
order.setBuyerId(UUID.fromString(checkoutInfo.getCheckOutInfo().getUserId()));
3836
order.setCheckoutId(checkoutInfo.getCheckoutId());
39-
37+
4038
for (var orderItems : order.getOrderItems()) {
4139
orderItems.setOrder(order);
4240
}
4341
ordersRepository.persist(order);
4442

4543
var event = new OrderCompleted(order.getId(), order.getCartId(), order.getBuyerId(), order.getTransactionId(),
4644
order.getOrderedDate());
47-
48-
orderCompletedEventEmitter.send(event);
45+
orderCompletedEventEmitter.send(event).toCompletableFuture().join();
4946
}
50-
5147
}

0 commit comments

Comments
 (0)