Skip to content

Commit

Permalink
Added client options for schema registry
Browse files Browse the repository at this point in the history
  • Loading branch information
rob3000 committed Jun 4, 2021
1 parent 712f6c6 commit 6ac0b8b
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 8 deletions.
5 changes: 3 additions & 2 deletions src/deserializer/avro-response.deserializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Logger } from '@nestjs/common/services/logger.service';
import { KafkaResponse } from "../interfaces";
import { SchemaRegistry } from "@kafkajs/confluent-schema-registry";
import { SchemaRegistryAPIClientArgs } from "@kafkajs/confluent-schema-registry/dist/api"
import { SchemaRegistryAPIClientOptions } from "@kafkajs/confluent-schema-registry/dist/@types";
import { KafkaResponseDeserializer } from "./kafka-response.deserializer";

export class KafkaAvroResponseDeserializer
Expand All @@ -12,8 +13,8 @@ export class KafkaAvroResponseDeserializer
protected logger = new Logger(KafkaAvroResponseDeserializer.name);
protected fallback: KafkaResponseDeserializer;

constructor(config: SchemaRegistryAPIClientArgs) {
this.registry = new SchemaRegistry(config);
constructor(config: SchemaRegistryAPIClientArgs, options?: SchemaRegistryAPIClientOptions) {
this.registry = new SchemaRegistry(config, options);
this.fallback = new KafkaResponseDeserializer()
}

Expand Down
5 changes: 4 additions & 1 deletion src/serializer/avro-request.serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { Serializer } from "@nestjs/microservices";
import { Logger } from '@nestjs/common/services/logger.service';
import { SchemaRegistry } from "@kafkajs/confluent-schema-registry";
import { SchemaRegistryAPIClientArgs } from "@kafkajs/confluent-schema-registry/dist/api"
import { SchemaRegistryAPIClientOptions } from "@kafkajs/confluent-schema-registry/dist/@types";

import { KafkaMessageSend, KafkaMessageObject } from "../interfaces";

type KafkaAvroRequestSerializerSchema = {
Expand All @@ -15,6 +17,7 @@ type KafkaAvroRequestSerializerSchema = {
export type KafkaAvroRequestSerializerConfig = {
schemas: KafkaAvroRequestSerializerSchema[],
config: SchemaRegistryAPIClientArgs;
options: SchemaRegistryAPIClientOptions;
schemaSeparator?: string;
schemaFetchIntervalSeconds?: number;
}
Expand All @@ -37,7 +40,7 @@ export class KafkaAvroRequestSerializer
private lastSchemaFetchInterval: Map<string, number> = new Map();

constructor(options: KafkaAvroRequestSerializerConfig) {
this.registry = new SchemaRegistry(options.config);
this.registry = new SchemaRegistry(options.config, options.options);
this.config = {
schemaFetchIntervalSeconds: 3600,
...options
Expand Down
20 changes: 15 additions & 5 deletions test/e2e/app/config.app.async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,22 @@ import { TOPIC_NAME, TestConsumer } from './test.controller';
groupId: 'test-e2e-consumer',
allowAutoTopicCreation: true,
},
deserializer: new KafkaAvroResponseDeserializer({
host: testConfigService.getAvroHost(),
}),
serializer: new KafkaAvroRequestSerializer({
deserializer: {
type: KafkaAvroResponseDeserializer,
config: {
host: testConfigService.getAvroHost(),
},
options: {

}
},
serializer: {
type: KafkaAvroRequestSerializer,
config: {
host: testConfigService.getAvroHost(),
},
options: {

},
schemas: [
{
Expand All @@ -39,7 +49,7 @@ import { TOPIC_NAME, TestConsumer } from './test.controller';
value: TOPIC_NAME,
},
],
}),
}
},
},
];
Expand Down

0 comments on commit 6ac0b8b

Please sign in to comment.