- Atlas MongoDB account with a free tier cluster and sample data load.
- Confluent Cloud account with a kafka queue.
- Atlas Stream Processor instance.
To enable automatic code formatting before commits, configure git to use the project's hooks directory:
git config core.hooksPath .githooks- Install quarkus cli.
- Set up a new Quarkus project using the Quarkus CLI.
quarkus create app ifg:mug-movies --extension='quarkus-mongodb-rest-data-panache,quarkus-rest,quarkus-rest-jackson' --no-code- Set up the MongoDB connection in
src/main/resources/application.properties:
%dev.quarkus.mongodb.connection-string=mongodb+srv://mugcluster0.8zlaiur.mongodb.net
quarkus.mongodb.database=sample_mflix
%dev.quarkus.mongodb.tls=true
%dev.quarkus.mongodb.tls-configuration-name=mongo
%dev.quarkus.mongodb.credentials.auth-mechanism=MONGODB-X509
%dev.quarkus.tls.mongo.key-store.pem.key-certs.key=.tls/mongo.pem
%dev.quarkus.tls.mongo.key-store.pem.key-certs.cert=.tls/mongo.pemNote: The mongo-key.pem and mongo-cert.pem files can be obtained from your Atlas UI and should be placed in the .tls directory at the root of your project.
- Create a new entity class
Movie.javainsrc/main/java/ifg/mug/movies/entityand define the fields you want to expose.
@MongoEntity(collection = "movies")
public class Movie extends PanacheMongoEntity {
// Define your fields here
}- Create PanacheMongoEntityResource class
MovieResource.javainsrc/main/java/ifg/mug/movies/rest:
public interface MovieResource extends PanacheMongoEntityResource<Movie, ObjectId> {
}- Run the application in dev mode:
./mvnw quarkus:dev- Test the API.
- Install oha (おはよう).
- Run the load test:
oha -z 10s -c 20 -q 100 --latency-correction --disable-keepalive 'http://localhost:8080/movie/573a1393f29313caabcddbed'- Add the
quarkus-cacheextension to your project:
./mvnw quarkus:add-extension -Dextensions="quarkus-cache"- Introduce caching in your
MovieResource.java:
@GET
@Path("/{id}/cached")
@Produces("application/json")
@CacheResult(cacheName = "movieCache")
default Movie findByIdCached(@PathParam("id") String id) {
Log.debugf("Find movie by id: %s", id);
return Movie.findById(new ObjectId(id));
}- Repeat the load test with caching enabled:
oha -z 10s -c 20 -q 100 --latency-correction --disable-keepalive 'http://localhost:8080/movie/573a1393f29313caabcddbed/cached'- Set Atlas Stream Processor to listen to the
moviecollection and produce events to a Kafka topic. - Add the
quarkus-kafka-clientextension to your project:
./mvnw quarkus:add-extension -Dextensions="quarkus-kafka-client,messaging-kafka"- Configure the Kafka connection in
src/main/resources/application.properties:
%dev.kafka.bootstrap.servers=${ENV_KAFKA_BOOTSTRAP_SERVERS}
kafka.group.id=mug-movies
kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
%dev.kafka.security.protocol=SASL_SSL
%dev.kafka.sasl.mechanism=PLAIN
%dev.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='${ENV_KAFKA_CLUSTER_API_KEY}' password='${ENV_KAFKA_CLUSTER_API_SECRET}';
mp.messaging.incoming.mug-movies-cdc.topic=mug-movies-cdc
mp.messaging.incoming.mug-movies-cdc.connector=smallrye-kafka
mp.messaging.incoming.mug-movies-cdc.offset.auto.reset=earliestNote: Create .env file in the root of your project with the following content:
ENV_KAFKA_BOOTSTRAP_SERVERS=YOUR_URL_HERE.aws.confluent.cloud:9092
ENV_KAFKA_CLUSTER_API_KEY=INSERT_YOUR_KAFKA_CLUSTER_API_KEY_HERE
ENV_KAFKA_CLUSTER_API_SECRET=INSERT_YOUR_KAFKA_CLUSTER_API_SECRET_HERE- Create a Kafka consumer
MovieCdcConsumer.java:
@Incoming("mug-movies-cdc")
public void consume(ConsumerRecord<String, String> record) {
Log.debugf("Received record: %s", record);
invalidateCache(record.key());
}
@CacheInvalidate(cacheName = "movieCache")
public void invalidateCache(String id) {
Log.debugf("Cache invalidated for movie with id: %s", id);
}- Set kafka consumer logging level to DEBUG in
src/main/resources/application.properties:
quarkus.log.category."ifg.mug.movies.kafka".level=DEBUG- Verify that the cache is invalidated when a movie is updated in the MongoDB collection. You can do this by updating a movie in the MongoDB Atlas UI and checking the logs for cache invalidation messages.
- Well done! You have successfully implemented a simple CRUD REST API with MongoDB, measured its performance, introduced caching, and set up cache invalidation using Kafka.