Skip to content

Commit

Permalink
Added avro serializer. Added E2E test to test avro serializer and des…
Browse files Browse the repository at this point in the history
…erializer
  • Loading branch information
rob3000 committed Oct 28, 2020
1 parent e7845b5 commit 15df561
Show file tree
Hide file tree
Showing 14 changed files with 234 additions and 58 deletions.
16 changes: 16 additions & 0 deletions .babelrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"presets": [
[
"@babel/preset-env", {
"targets": {
"node": "current"
},
"debug": true
},
"@babel/preset-typescript"
]
],
"plugins": [
["@babel/plugin-proposal-decorators", { "legacy": true }],
]
}
4 changes: 3 additions & 1 deletion .npmignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@ src/
tsconfig.json
.github/
.gitignore
.prettierrc
kafka/
babel.config.js
.babelrc
test/
4 changes: 4 additions & 0 deletions .prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"singleQuote": true,
"trailingComma": "all"
}
14 changes: 9 additions & 5 deletions src/deserializer/avro-response.deserializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,38 @@ import { Logger } from '@nestjs/common/services/logger.service';
import { KafkaResponse } from "../interfaces";
import { SchemaRegistry } from "@kafkajs/confluent-schema-registry";
import { SchemaRegistryAPIClientArgs } from "@kafkajs/confluent-schema-registry/dist/api"
import { KafkaResponseDeserializer } from "./kafka-response.deserializer";

export class KafkaAvroResponseDeserializer
implements Deserializer<any, Promise<KafkaResponse>> {

protected registry: SchemaRegistry;
protected logger = new Logger(KafkaAvroResponseDeserializer.name);
protected fallback: KafkaResponseDeserializer;

constructor(config: SchemaRegistryAPIClientArgs) {
this.registry = new SchemaRegistry(config);
this.fallback = new KafkaResponseDeserializer()
}

async deserialize(message: any, options?: Record<string, any>): Promise<KafkaResponse> {
const { value, id, timestamp, offset } = message;

const { value, key, timestamp, offset } = message;
const decodeResponse = {
response: value,
id,
key,
timestamp,
offset,
}

try {
decodeResponse.id = await this.registry.decode(message.key);
decodeResponse.key = await this.registry.decode(message.key);
decodeResponse.response = await this.registry.decode(message.value);
} catch (e) {
this.logger.error(e);
// Fall back to the normal kafka deserialize.
const msg = this.fallback.deserialize(message);
Object.assign(decodeResponse, msg);
}

return decodeResponse;
}

Expand Down
2 changes: 1 addition & 1 deletion src/deserializer/kafka-response.deserializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export class KafkaResponseDeserializer
}

return {
id,
key: id,
response,
timestamp,
offset
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ export * from "./deserializer/kafka-response.deserializer";
export * from "./serializer/kafka-request.serializer";

export * from "./deserializer/avro-response.deserializer";
export * from "./serializer/avro-request.serializer";
14 changes: 12 additions & 2 deletions src/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Deserializer, Serializer } from "@nestjs/microservices";
import { ConsumerConfig, KafkaConfig, ProducerConfig } from "kafkajs";
import { ConsumerConfig, KafkaConfig, ProducerConfig, ProducerRecord, Message } from "kafkajs";

export interface KafkaResponse<T = any> {
response: T;
id: string;
key: string;
timestamp: string;
offset: number
}
Expand All @@ -20,3 +20,13 @@ export interface KafkaModuleOption {
seek?: Record<string, string>
}
}

export interface KafkaMessageObject extends Message {
value: any | Buffer | string | null;
key: any;
}

export interface KafkaMessageSend extends Omit<ProducerRecord, 'topic'> {
messages: KafkaMessageObject[],
topic?: string
}
16 changes: 7 additions & 9 deletions src/kafka.service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { Injectable, OnModuleDestroy, OnModuleInit } from "@nestjs/common";
import { Consumer, Kafka, Producer, ProducerRecord, RecordMetadata } from "kafkajs";
import { Consumer, Kafka, Producer, RecordMetadata } from "kafkajs";
import { Deserializer, Serializer } from "@nestjs/microservices";
import { Logger } from "@nestjs/common/services/logger.service";
import { KafkaLogger } from "@nestjs/microservices/helpers/kafka-logger";
import { KafkaResponseDeserializer } from "./deserializer/kafka-response.deserializer";
import { KafkaRequestSerializer } from "./serializer/kafka-request.serializer";
import { KafkaModuleOption } from "./interfaces";
import { KafkaModuleOption, KafkaMessageSend } from "./interfaces";

import {
SUBSCRIBER_MAP,
Expand Down Expand Up @@ -59,7 +59,6 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {
SUBSCRIBER_MAP.forEach((functionRef, topic) => {
this.subscribe(topic);
});

this.bindAllTopicToConsumer();
}

Expand Down Expand Up @@ -94,17 +93,17 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {
*
* @param message
*/
async send(message: ProducerRecord): Promise<RecordMetadata[]> {
async send(message: KafkaMessageSend): Promise<RecordMetadata[]> {
if (!this.producer) {
this.logger.error('There is no producer, unable to send message.')
return;
}

message.messages = message.messages.map((messageValue) => this.serializer.serialize(messageValue))
const serializedPacket = await this.serializer.serialize(message);

// @todo - rather than have a producerRecord,
// most of this can be done when we create the controller.
return this.producer.send(message);
return this.producer.send(serializedPacket);
}

/**
Expand Down Expand Up @@ -155,10 +154,9 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {
const objectRef = SUBSCRIBER_OBJECT_MAP.get(topic);
const callback = SUBSCRIBER_MAP.get(topic);

const { timestamp, response, offset, id } = await this.deserializer.deserialize(message, { topic });

try {
await callback.apply(objectRef, [response, id, offset, timestamp, partition]);
const { timestamp, response, offset, key } = await this.deserializer.deserialize(message, { topic });
await callback.apply(objectRef, [response, key, offset, timestamp, partition]);
} catch(e) {
this.logger.error(`Error for message ${topic}: ${e}`);
}
Expand Down
78 changes: 78 additions & 0 deletions src/serializer/avro-request.serializer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { Serializer } from "@nestjs/microservices";
import { Logger } from '@nestjs/common/services/logger.service';
import { ProducerRecord } from "kafkajs";
import { SchemaRegistry, readAVSC } from "@kafkajs/confluent-schema-registry";
import { SchemaRegistryAPIClientArgs } from "@kafkajs/confluent-schema-registry/dist/api"
import { KafkaMessageSend, KafkaMessageObject } from "../interfaces";

type KafkaAvroRequestSerializerSchema = {
topic: string;
key?: string;
value: string;
}

export type KafkaAvroRequestSerializerConfig = {
schemas: KafkaAvroRequestSerializerSchema[],
config: SchemaRegistryAPIClientArgs;
schemaSeparator?: string;
}

export class KafkaAvroRequestSerializer
implements Serializer<KafkaMessageSend, Promise<KafkaMessageSend>> {

protected registry: SchemaRegistry;
protected logger = new Logger(KafkaAvroRequestSerializer.name);
protected schemas = new Map();
protected separator: string;

constructor(options: KafkaAvroRequestSerializerConfig) {
this.registry = new SchemaRegistry(options.config);
// this.separator = options.schemaSeparator || '-';
let keySchema = null;
options.schemas.forEach((obj) => {
if (obj.key) {
keySchema = readAVSC(obj.key);
}
// test.topic-Value
const valueSchema = readAVSC(obj.value);
// const valueSubject = valueSchema.namespace + this.separator + valueSchema.name;

const a = {
key: keySchema,
value: valueSchema
}

this.schemas.set(obj.topic, a);
});

}

async serialize(value: KafkaMessageSend): Promise<KafkaMessageSend> {
const outgoingMessage = value;

try {
const schemas = this.schemas.get(value.topic);

// @todo - need to work out a way to better get the schema based on topic.
const keyId = await this.registry.getLatestSchemaId(value.topic + '-Key')
const valueId = await this.registry.getLatestSchemaId(value.topic + '-Value')

const messages: Promise<KafkaMessageObject>[] = value.messages.map(async(origMessage) => {
const encodedValue = await this.registry.encode(valueId, origMessage.value)
const encodedKey = await this.registry.encode(keyId, origMessage.key)
return {
...origMessage,
value: encodedValue,
key: encodedKey
};
});

const results = await Promise.all(messages);
outgoingMessage.messages = results;
} catch (e) {
this.logger.error(e);
}
return outgoingMessage;
}

}
83 changes: 57 additions & 26 deletions test/app.e2e-spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,45 @@
import { Test, TestingModule } from '@nestjs/testing';
import { INestMicroservice } from '@nestjs/common';
import { join } from 'path';
import AppModule from './e2e/app/config.app';
import { TestConsumer, TOPIC_NAME } from './e2e/app/test.controller';
import { SchemaRegistry, readAVSC } from "@kafkajs/confluent-schema-registry";

describe('AppModule (e2e)', () => {
const messages = [
{
key: {
id: 1
},
value: {
id: 1,
metadataId: 2,
objectId: 3,
firstName: 'Hello',
lastName: 'World!',
__table: 'test-table',
__deleted: null,
}
},
{
key: {
id: 2
},
value: {
id: 2,
metadataId: 3,
objectId: 4,
firstName: 'Foo',
lastName: 'Bar',
__table: 'test-table',
__deleted: null,
}
},
];

let app: INestMicroservice;
let controller: TestConsumer;
let registry: SchemaRegistry;

beforeAll(async () => {
const moduleFixture: TestingModule = await Test.createTestingModule({
Expand All @@ -14,43 +49,39 @@ describe('AppModule (e2e)', () => {
app = moduleFixture.createNestMicroservice({});
app.enableShutdownHooks();
await app.listenAsync();

controller = await app.resolve(TestConsumer);
registry = new SchemaRegistry({ host: 'http://localhost:8081/' })
});

afterAll(async() => {
await app.close();
});

it("should give kafka some time", done => {
setTimeout(done, 2500);
});

it('can produce JSON messages', async () => {
const cont = await app.resolve(TestConsumer);

return cont.sendMessage([{
value: 'Hello World!',
timestamp: Date.now().toString(),
}]).then((value) => {
console.log(value);
expect(value).toBe([{
topicName: TOPIC_NAME,
partition: 0,
errorCode: 0,
baseOffset: '0',
logAppendTime: '-1',
logStartOffset: '0'
}]);
});
setTimeout(done, 4000);
});

// it('can accept JSON message', () => {
it('We can send schemas to schema registry', async() => {
// For our other tests we require the schema to already exist
// in schema registry and dont allow uploaded through the nestJS
// application.
const valuePath = join(__dirname, 'e2e', 'app', 'value-schema.avsc');
const keyPath = join(__dirname, 'e2e', 'app', 'key-schema.avsc');
const valueSchema = readAVSC(valuePath);
const keySchema = readAVSC(keyPath);

await registry.register(valueSchema, { separator: '-' })
await registry.register(keySchema, { separator: '-' })
})

// });
it('We can SEND and ACCEPT AVRO messages', async (done) => {
await controller.sendMessage({ messages })
expect(controller.messages.length).toBe(messages.length);

// it('can produce AVRO message', () => {
// });
expect(controller.messages[0]).toEqual(messages[0].value);
expect(controller.messages[1]).toEqual(messages[1].value);

// it('can accept AVRO message', () => {
// });
done();
});
});
18 changes: 16 additions & 2 deletions test/e2e/app/config.app.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { join } from 'path';
import { Module } from '@nestjs/common';
import { KafkaModule, KafkaAvroResponseDeserializer } from "../../../src";
import { TestConsumer } from "./test.controller";
import { KafkaModule, KafkaAvroResponseDeserializer, KafkaAvroRequestSerializer } from "../../../src";
import { TOPIC_NAME, TestConsumer } from "./test.controller";


@Module({
imports: [
Expand All @@ -22,6 +24,18 @@ import { TestConsumer } from "./test.controller";
deserializer: new KafkaAvroResponseDeserializer({
host: 'http://localhost:8081/'
}),
serializer: new KafkaAvroRequestSerializer({
config: {
host: 'http://localhost:8081/'
},
schemas: [
{
topic: TOPIC_NAME,
// key: join(__dirname, 'key-schema.avsc'),
value: join(__dirname, 'value-schema.avsc')
}
],
}),
consumeFromBeginning: true
}
},
Expand Down
Loading

0 comments on commit 15df561

Please sign in to comment.