Skip to content

rob3000/nestjs-kafka

Folders and files

NameName
Last commit message
Last commit date
Nov 5, 2020
Nov 6, 2020
Oct 18, 2021
Jun 4, 2021
Oct 28, 2020
Nov 5, 2020
Feb 14, 2021
Oct 28, 2020
Oct 28, 2020
Oct 16, 2020
Oct 18, 2021
Feb 17, 2022
Jun 4, 2021
Oct 9, 2020
Oct 9, 2020

Repository files navigation

Nest Logo KafkaJS Logo

NestJS + KafkaJS

Integration of KafkaJS with NestJS to build event driven microservices.

Setup

Import and add the KafkaModule to the imports array of the module for which you would like to use Kafka.

Synchronous Module Initialization

Register the KafkaModule synchronous with the register() method:

@Module({
  imports: [
    KafkaModule.register([
      {
        name: 'HERO_SERVICE',
        options: {
          client: {
            clientId: 'hero',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'hero-consumer'
          }
        }
      },
    ]),
  ]
  ...
})

Asynchronous Module Initialization

Register the KafkaModule asynchronous with the registerAsync() method:

import { ConfigModule, ConfigService } from '@nestjs/config';

@Module({
  imports: [
    ConfigModule.forRoot(),
    KafkaModule.registerAsync(['HERO_SERVICE'], {
            useFactory: async (configService: ConfigService) => {
                const broker = this.configService.get('broker');
                return [
                    {
                        name: 'HERO_SERVICE',
                        options: {
                              clientId: 'hero',
                              brokers: [broker],
                            },
                            consumer: {
                              groupId: 'hero-consumer'
                            }
                        }
                    }
                ];
            },
            inject: [ConfigService]
        })
  ]
  ...
})

Full settings can be found:

Config Options
client https://kafka.js.org/docs/configuration
consumer https://kafka.js.org/docs/consuming#options
producer https://kafka.js.org/docs/producing#options
serializer
deserializer
consumeFromBeginning true/false

Subscribing

Subscribing to a topic to accept messages.

export class Consumer {
  constructor(
    @Inject('HERO_SERVICE') private client: KafkaService
  ) {}

  onModuleInit(): void {
    this.client.subscribeToResponseOf('hero.kill.dragon', this)
  }

  @SubscribeTo('hero.kill.dragon')
  async getWorld(data: any, key: any, offset: number, timestamp: number, partition: number, headers: IHeaders): Promise<void> {
    ...
  }

}

Producing

Send messages back to kafka.

const TOPIC_NAME = 'hero.kill.dragon';

export class Producer {
  constructor(
    @Inject('HERO_SERVICE') private client: KafkaService
  ) {}

  async post(message: string = 'Hello world'): Promise<RecordMetadata[]> {
    const result = await this.client.send({
      topic: TOPIC_NAME,
      messages: [
        {
          key: '1',
          value: message
        }
      ]
    });

    return result;
  }

}

Schema Registry support.

By default messages are converted to JSON objects were possible. If you're using AVRO you can add the SchemaRegistry deserializer to convert the messages. This uses the KafkaJS Schema-registry module

In your module.ts:

@Module({
  imports: [
    KafkaModule.register([
      {
        name: 'HERO_SERVICE',
        options: {
          client: {
            clientId: 'hero',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'hero-consumer'
          }
        },
        deserializer: new KafkaAvroResponseDeserializer({
          host: 'http://localhost:8081'
        }),
        serializer: new KafkaAvroRequestSerializer({
          config: {
            host: 'http://localhost:8081/'
          },
          schemas: [
            {
              topic: 'test.topic',
              key: join(__dirname, 'key-schema.avsc'),
              value: join(__dirname, 'value-schema.avsc')
            }
          ],
        }),
      },
    ]),
  ]
  ...
})

See the e2e test for example.

TODO

  • Tests

PRs Welcome ❤️