Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Readme #8

Merged
merged 2 commits into from
Apr 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 33 additions & 7 deletions kafka-observer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,22 @@ The architecture of this sample consists of an **observer** and a **consumer**,

See the following diagram:

![Architecture Diagram](images/kafkapoc.jpg)
![Architecture Diagram](images/Kafka-Observer-Consumer-Acrchitecture.jpg)

### Observer

The **observer** is a Code Engine Job operating in [daemon mode](https://cloud.ibm.com/docs/codeengine?topic=codeengine-job-daemon). At runtime, it dynamically creates a new consumer group based on provided Kafka Broker addresses and configurations (_including Kafka Topics_), persistently waiting for incoming messages to be claimed from a Kafka Broker.
The **observer** is a Code Engine Job operating in [daemon mode](https://cloud.ibm.com/docs/codeengine?topic=codeengine-job-daemon). At runtime, based on provided Kafka Broker addresses and configurations (_including Kafka Topics_), it will constantly check for any messages in topic offsets and compare with the consumer group offsets of that topic.

When a new message is claimed from a specific Kafka Topic, the **observer** wakes-up the corresponding **consumer** Job, by submitting a JobRun. The decision on which **consumer** Job to wake-up depends on the Topic the **consumer** Job is using. This wake-up mechanism allows **consumer** Jobs to only run when needed, optimizing resource consumption in a serverless fashion.
The way it works is, the observer monitors every partition of a topic, and if any new message arrives in a particular partition, it triggers corresponding job runs with the number equal to the number of partitions the new messages have arrived in.

For example, consider I have 4 partitions in a topic called Topic X. When new events arrive in partition 0 and partition 1 of the topic, then the observer will know and it will trigger only 2 pods of the Consumer JobRun. The pods will then be able to consume the messages.

The decision on which **consumer** Job to wake-up depends on the Topic the **consumer** Job is using. This wake-up mechanism allows **consumer** Jobs to only run when needed, optimizing resource consumption in a serverless fashion.


### Consumer

The **consumer** is a Code Engine Job operating in [daemon mode](https://cloud.ibm.com/docs/codeengine?topic=codeengine-job-daemon). Unlike the observer, the **consumer** runs only in response to incoming messages within the desired Kafka Topics. Once running, it will gracefully shutdown within one minute, if none further messages are claimed.
The **consumer** is a Code Engine Job operating in [daemon mode](https://cloud.ibm.com/docs/codeengine?topic=codeengine-job-daemon). Unlike the observer, the **consumer** runs only in response to incoming messages within the desired Kafka Topics. Once running, it will gracefully shutdown within configuarable idle timeout, if none further messages are claimed.

In this sample, we provided a native Kafka client implementation written in Go. Code Engine users can opt-in for other native clients using different runtimes, such as Java, when implementing their **consumer** logic.

Expand Down Expand Up @@ -57,27 +61,49 @@ The mapping is defined via the following [kafkadata](resources/kafkadata) file,
payments:
partitions: 4
jobs:
- payments-consumer
- name: payments-consumer
consumer_group: payments-consumer-group
- name: foobar-consumer
consumer_group: foobar-consumer-group
shipping:
partitions: 3
jobs:
- shipping-consumer
- name: shipping-consumer
consumer_group: shipping-consumer-group
```

The above is explained as follows:
- `.payments` and `.shipping` correspond to the existing Topics of interest within the same Kafka instance.
- `.payments.partitions` and `.shipping.partitions` correspond to the partition size of the Kafka Topics.
- `.payments.jobs` and `.shipping.jobs` correspond to the CodeEngine **consumer** Jobs that want to consume messages from the related Kafka Topic.
- `.jobs` will be a list. Each value will have the name of the job and the consumer group it belongs to.

As an example, if you have one single Topic `foobar` with `2` partitions and you want CE Job `foobar-consumer` to consume from it, this is how the `kafkadata` file will look:

```yaml
foobar:
partitions: 2
jobs:
- foobar-consumer
- name: foobar-consumer
consumer_group: foobar-consumer-group
```

You can also set the same consumer group for multiple topics like this:

```yaml
payments:
partitions: 2
jobs:
- name: foobar-consumer
consumer_group: foobar-consumer-group
shipping:
partitions: 3
jobs:
- name: foobar-consumer
consumer_group: foobar-consumer-group
```


## Running the Sample

1. Login to IBM Cloud
Expand Down
20 changes: 16 additions & 4 deletions kafka-observer/cmd/observer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"os"
"os/signal"
"strconv"
"syscall"
"time"

Expand Down Expand Up @@ -46,6 +47,17 @@ func main() {
log.Panicf("%s is not set", cmd.CE_PROJECT_ID)
}

tickerTime := cmd.DEFAULT_OBSERVER_TICKER

// Handle idle timeout init
if observerTicker, exists := os.LookupEnv(cmd.OBSERVER_TICKER); exists {
tickerValue, err := strconv.Atoi(observerTicker)
if err != nil {
log.Panicf("error parsing %s duration: %v", cmd.OBSERVER_TICKER, err)
}
tickerTime = tickerValue
}

observer, err := NewObserver(config)
if err != nil {
log.Panic(err)
Expand Down Expand Up @@ -87,7 +99,7 @@ func main() {

for topicName, topic := range observer.Topics {
go func(topic string) {
ticker := time.Tick(500 * time.Millisecond) // should be configurable
ticker := time.Tick(time.Duration(tickerTime) * time.Millisecond) // should be configurable
for range ticker {
offset, err := observer.GetTopicPartitionsOffset(topic)
if err != nil {
Expand Down Expand Up @@ -118,12 +130,12 @@ func main() {
go func(name string, consumerGroups []ConsumerGroup) {
for i, consumerGroup := range consumerGroups {
go func(topicName string, cg ConsumerGroup, index int) {
ticker := time.Tick(500 * time.Millisecond) // should be configurable
ticker := time.Tick(time.Duration(tickerTime) * time.Millisecond) // should be configurable
for range ticker {
cgOffset, err := observer.GetConsumerGroupTopicPartitionsOffset(topicName, cg.Name)
if err != nil {
errCh <- err
break
continue
}

if observer.IsConsumerGroupTopicOffsetModified(topicName, cg.Name, cgOffset) || len(observer.Topics[topicName].ConsumerGroups[index].PartitionsOffset) == 0 {
Expand Down Expand Up @@ -151,7 +163,7 @@ func main() {
go func(t string, jobName string, podsCount int) {
// Lock is required, as there well be cases where
// multiple goroutines attempt to create jobruns for
// the same job. This avoids creating unneeded pods.
// the same job. This prevents the creation of unnecessary pods.
unlock := jobInvoker.JobMutexes.Lock(jobName)
defer unlock()
if err := jobInvoker.InvokeJobs(int64(podsCount), jobName); err != nil {
Expand Down
3 changes: 1 addition & 2 deletions kafka-observer/cmd/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,12 @@ func (o *Observer) IsAnyConsumerGroupOffsetModified(topic string) JobRunInfo {
for _, c := range topicObject.ConsumerGroups {
count := 0
for partition, offset := range c.PartitionsOffset {
if topicObject.PartitionsOffset[partition] > offset {
if topicObject.PartitionsOffset[partition] > offset && topicObject.PartitionsOffset[partition]!=0 {
log.Printf("topicOffset: %v/%v, consumerGroupOffset(%v): %v/%v", partition, topicObject.PartitionsOffset[partition], c.Name, partition, offset)
count++
}
jobRunInfo.JobRunToCreate[c.JobName] = count
}

}

return jobRunInfo
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed kafka-observer/images/kafkapoc.jpg
Binary file not shown.
3 changes: 3 additions & 0 deletions kafka-observer/internal/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ const (

IDLE_TIMEOUT = "IDLE_TIMEOUT"
DEFAULT_IDLE_TIMEOUT = 60

OBSERVER_TICKER = "OBSERVER_TICKER" // in milliseconds
DEFAULT_OBSERVER_TICKER = 500
)

// KafkaAuth holds required data
Expand Down