Skip to content

Commit

Permalink
#143: Handling order completed event (#144)
Browse files Browse the repository at this point in the history
* Order completed event handling

- order-api: sending OrderCompleted event using quarkus EventEmitter
- cart-api: consuming OrderCompleted using IBM/Sarama and update cart status
- cart-api: filter out completed carts, return 404 when cart is completed
- web-app: revalidating cart cache, recreate new cart afert checkout
  • Loading branch information
jurabek authored Jan 29, 2024
1 parent bf4203b commit c43a6fc
Show file tree
Hide file tree
Showing 32 changed files with 483 additions and 652 deletions.
4 changes: 3 additions & 1 deletion src/backend/docker/docker-compose.kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ services:
- kafka
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka:29092 1 30 && \
kafka-topics --create --topic checkout --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server kafka:29092'"
kafka-topics --create --topic checkout --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server kafka:29092 && \
kafka-topics --create --topic orders --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server kafka:29092
'"

kafka-ui:
image: provectuslabs/kafka-ui
Expand Down
2 changes: 1 addition & 1 deletion src/backend/docker/docker-compose.override.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ services:
environment:
- REDIS_HOST=redis:6379
- KAFKA_BROKER=kafka:29092
- CHECKOUT_TOPIC=checkout
- ORDERS_TOPIC=orders
- IDENTITY_URL=http://identity-api
- AUTH_URL=http://localhost:8080/identity
- BASE_PATH=/shoppingcart
Expand Down
39 changes: 22 additions & 17 deletions src/backend/load-tests/checkout.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,36 @@ export const options = {
vus: 300,
duration: "30s",
thresholds: {
'http_req_duration': ['p(99)<300'],
http_req_duration: ["p(99)<300"],
},
};

const baseUrl = "http://localhost:8080";
const catalogItems = baseUrl + "/cart/api/v1/items/all";

export default function () {
const customer_id = 'dfdee7b6-04d3-4d77-89a9-6542a4f2f31a'
const baseUrl = "http://localhost:8080";
const customer_id = "dfdee7b6-04d3-4d77-89a9-6542a4f2f31a";
const foods = http.get(baseUrl + "/catalog/items/all").json();
const food = foods[Math.floor(Math.random() * foods.length)];

http.post(baseUrl + '/basket/api/v1/items', JSON.stringify({
customer_id,
items: [
{
food_id: food.id,
food_name: food.name,
old_unit_price: 0,
picture: food.image,
quantity: Math.floor(Math.random() * 20),
unit_price: food.price,
}
]
}))
http.post(
catalogItems,
JSON.stringify({
customer_id,
items: [
{
food_id: food.id,
food_name: food.name,
old_unit_price: 0,
picture: food.image,
quantity: Math.floor(Math.random() * 20),
unit_price: food.price,
},
],
})
);

sleep(1);

http.post(baseUrl + '')
http.post(baseUrl + "");
}
36 changes: 23 additions & 13 deletions src/backend/services/cart-api/cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ import (
"syscall"
"time"

"github.com/IBM/sarama"
"github.com/dnwe/otelsarama"
"github.com/jurabek/cart-api/cmd/config"
"github.com/jurabek/cart-api/internal/database"
"github.com/jurabek/cart-api/internal/docs"
"github.com/jurabek/cart-api/internal/events"
grpcsvc "github.com/jurabek/cart-api/internal/grpc"
"github.com/jurabek/cart-api/internal/handlers"
"github.com/jurabek/cart-api/internal/middlewares"
pbv1 "github.com/jurabek/cart-api/pb/v1"
"github.com/jurabek/cart-api/pkg/reciever"
"github.com/redis/go-redis/v9"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -87,18 +91,27 @@ func main() {
if err != nil {
fmt.Print(err)
}
cartRepository := repositories.NewCartRepository(redisClient)

// p, err := sarama.NewSyncProducer([]string{cfg.KafkaBroker}, nil)
// if err != nil {
// log.Fatal().Err(err).Msg("new producer failed!")
// }
config := sarama.NewConfig()
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second

// tracedProducer := otelsarama.WrapSyncProducer(nil, p)
// defer tracedProducer.Close()
kafkaConsumer, error := sarama.NewConsumer([]string{cfg.KafkaBroker}, config)
if error != nil {
log.Fatal().Err(error).Msg("new consumer failed!")
}
kafkaConsumer = otelsarama.WrapConsumer(kafkaConsumer)

cartRepository := repositories.NewCartRepository(redisClient)
cartHandler := handlers.NewCartHandler(cartRepository)
msgReciever := reciever.NewMessageReciever(kafkaConsumer, cfg.OrdersTopic)
go func() {
recieveErr := msgReciever.Recieve(events.NewOrderCompletedEventHandler(cartRepository))
log.Error().Err(recieveErr).Msg("Error recieving messages")
}()

go grpcServer(grpcsvc.NewCartGrpcService(cartRepository))

cartHandler := handlers.NewCartHandler(cartRepository)
apiV1 := router.Group(basePath + "/api/v1")
{
cart := apiV1.Group("/cart")
Expand All @@ -107,7 +120,7 @@ func main() {
cart.GET(":id", handlers.ErrorHandler(cartHandler.Get))
cart.DELETE(":id", handlers.ErrorHandler(cartHandler.Delete))
cart.PUT(":id", handlers.ErrorHandler(cartHandler.Update))
cart.POST(":id/item", handlers.ErrorHandler(cartHandler.AddItem)) // adds item or increments quantity by CartID
cart.POST(":id/item", handlers.ErrorHandler(cartHandler.AddItem)) // adds item or increments quantity by CartID
cart.PUT(":id/item/:itemID", handlers.ErrorHandler(cartHandler.UpdateItem)) // updates line item item_id is ignored
cart.DELETE(":id/item/:itemID", handlers.ErrorHandler(cartHandler.DeleteItem))
}
Expand All @@ -122,8 +135,6 @@ func main() {
func(c *ginSwagger.Config) {
c.URL = basePath + "/swagger/doc.json"
}))

go grpcServer(grpcsvc.NewCartGrpcService(cartRepository))
_ = router.Run()
}

Expand All @@ -134,8 +145,7 @@ func grpcServer(svc pbv1.CartServiceServer) {
}

server := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)
reflection.Register(server)

Expand Down
12 changes: 6 additions & 6 deletions src/backend/services/cart-api/cmd/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (

// Configuration injects all environment variables into object
type Configuration struct {
ServerPort string
RedisHost string
KafkaBroker string
CheckoutTopic string
ServerPort string
RedisHost string
KafkaBroker string
OrdersTopic string
}

// Init initializes environment variables into config
Expand All @@ -24,8 +24,8 @@ func Init() *Configuration {
cfg.KafkaBroker = kafkaBroker
}

if checkoutTopic, ok := os.LookupEnv("CHECKOUT_TOPIC"); ok {
cfg.CheckoutTopic = checkoutTopic
if ordersTopic, ok := os.LookupEnv("ORDERS_TOPIC"); ok {
cfg.OrdersTopic = ordersTopic
}

return &cfg
Expand Down
49 changes: 25 additions & 24 deletions src/backend/services/cart-api/go.mod
Original file line number Diff line number Diff line change
@@ -1,63 +1,64 @@
module github.com/jurabek/cart-api

require (
github.com/Shopify/sarama v1.38.1
github.com/gin-gonic/gin v1.9.1
github.com/gomodule/redigo v2.0.0+incompatible
github.com/google/uuid v1.3.1
github.com/redis/go-redis/extra/redisotel/v9 v9.0.2
github.com/redis/go-redis/v9 v9.0.2
github.com/redis/go-redis/v9 v9.0.5
github.com/rs/zerolog v1.31.0
github.com/stretchr/testify v1.8.4
github.com/swaggo/gin-swagger v1.6.0
github.com/swaggo/swag v1.16.2
go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.40.0
go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.40.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.40.0
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0
go.opentelemetry.io/otel/sdk v1.14.0
go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.47.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0
go.opentelemetry.io/otel v1.22.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.22.0
go.opentelemetry.io/otel/sdk v1.22.0
)

require (
github.com/IBM/sarama v1.42.1
github.com/dnwe/otelsarama v0.0.0-20231212173111-631a0a53d5d4
github.com/redis/go-redis/extra/redisotel/v9 v9.0.5
)

require (
github.com/bytedance/sonic v1.10.2 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/chenzhuoyu/iasm v0.9.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/redis/go-redis/extra/rediscmd/v9 v9.0.2 // indirect
github.com/redis/go-redis/extra/rediscmd/v9 v9.0.5 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
go.opentelemetry.io/otel/metric v0.37.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.22.0 // indirect
go.opentelemetry.io/otel/metric v1.22.0 // indirect
go.opentelemetry.io/otel/trace v1.22.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
golang.org/x/arch v0.7.0 // indirect
google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect
)
Expand Down
Loading

0 comments on commit c43a6fc

Please sign in to comment.