An example of a basic Kafka - Oracle Advanced Queuing (AQ) adapter.
Kafka topics used in the example:
- "to-be-enqueued"
A topic where JSON strings are sent to be enqueued further
- "dequeued"
A topic where dequeued objects (converted to JSON strings) are displayed.
An order of execution:
- JSON string is inserted into the "to-be-enqueued" Kafka topic
- JSON string gets deserialized into a new object
- The new object is inserted into Oracle queue
- The object is extracted from Oracle queue
- The extracted object is serialized into a JSON string
- JSON string is sent to "dequeued" Kafka topic
The object type stored in Oracle queue:
CREATE TYPE c##user.extended_song AS OBJECT(
title VARCHAR2(80),
duration INTEGER,
file_path VARCHAR2(80),
description CLOB,
added_at TIMESTAMP(2)
);
A visual representation of the execution order:
Source code structure:
src/
├─ main/
│ ├─ java/
│ │ ├─ database/
│ │ │ ├─ QueueManager.java
│ │ │ ├─ QueueSetup.java
│ │ ├─ json/
│ │ │ ├─ ClobFieldDeserializer.java
│ │ │ ├─ ClobFieldSerializer.java
│ │ │ ├─ JsonValidator.java
│ │ ├─ kafka/
│ │ │ ├─ KafkaManager.java
│ │ ├─ main/
│ │ │ ├─ Main.java
│ │ ├─ model/
│ │ ├─ ExtendedSong.java
│ ├─ resources/
│ ├─ ExtendedSongJsonScheme.json
│ ├─ jdbc.properties
│ ├─ kafka.properties
│ ├─ log4j.properties
- database/QueueManager
- readJdbcProperties() - reading jdbc.properties resource file and setting JDBC variables
- establishDatabaseConnection() - getting JDBC connection instance
- createAqSession() - creating a new AQ Session instance
- getConnection() - getter method for JDBC connection instance
- getAqSession() - getter method for AQ session instance
- setQueue() - setting Oracle queue variable
- enqueue() - object enqueuing method
- dequeue() - object dequeuing method
- queueIsEmpty() - checking if queue table is empty
- database/QueueSetup
- createQueue()
- startQueue()
- stopQueue()
- json/ClobFieldDeserializer - CLOB field deserialization (JSON -> POJO)
- deserialize()
- json/ClobFieldSerializer - CLOB field serialization (POJO -> JSON)
- serialize()
- json/JsonValidator
- validate() - validating JSON string for the Song object
- kafka/KafkaManager
- readKafkaProperties() - reading kafka.properties and setting Kafka variables
- getConsumer() - returning Kafka Consumer instance
- getProducer() - returning Kafka Producer instance
- startStreaming() - starting Producer while() loop
- startListening() - starting Consumer while() loop
- main/Main
- main() - entry point of the app (invokes Enqueuer and Dequeuer threads)
- model/ExtendedSong - the class that represents Oracle object type (Song) by implementing SQLData interface
Usage:
- Run Zookeeper
- Run Kafka Server
- Edit .properties files (JDBC and Kafka access credentials). Run the app (main.Main)
- Send a new Song as a JSON string to "to-be-enqueued" topic:
{"title":"Haruka Kanata", "duration":165, "filePath": "/etc/music/haruka_kanata.mp3", "description": "Haruka Kanata song description", "addedAt": "2021-11-01 00:33:05.777"}
- The string will be deserialized to POJO Song
- POJO Song will be enqueued and then dequeued:
- Dequeued POJO will be serialized and sent to "dequeued" topic: