Skip to content

Commit

Permalink
Updated avro serializer, Adding E2E tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rob3000 committed Nov 5, 2020
1 parent ce91d87 commit 5ea3ceb
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 43 deletions.
25 changes: 25 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
module.exports = {
parser: '@typescript-eslint/parser',
parserOptions: {
project: 'tsconfig.json',
sourceType: 'module',
},
plugins: ['@typescript-eslint/eslint-plugin'],
extends: [
'plugin:@typescript-eslint/eslint-recommended',
'plugin:@typescript-eslint/recommended',
'prettier',
'prettier/@typescript-eslint',
],
root: true,
env: {
node: true,
jest: true,
},
rules: {
'@typescript-eslint/interface-name-prefix': 'off',
'@typescript-eslint/explicit-function-return-type': 'off',
'@typescript-eslint/explicit-module-boundary-types': 'off',
'@typescript-eslint/no-explicit-any': 'off',
},
};
19 changes: 17 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,22 @@ name: Node.js CI
on: [push]

jobs:
build:
lint:
runs-on: ubuntu-latest

strategy:
matrix:
node-version: [12.x]

steps:
- uses: actions/checkout@v2
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node-version }}
- run: npm ci
- run: npm run lint
build:
runs-on: ubuntu-latest

strategy:
Expand All @@ -17,10 +31,11 @@ jobs:
uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node-version }}
- run: npm install
- run: npm ci
- name: Start Docker containers for Zookeeper, Kafka and Schema Registry
run: npm run kafka:start
- run: sleep 120
- run: npm run test:e2e
env:
CI: true
- run: npm run kafka:stop
20 changes: 17 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,31 @@ In your `module.ts`:
},
deserializer: new KafkaAvroResponseDeserializer({
host: 'http://localhost:8081'
})
}),
serializer: new KafkaAvroRequestSerializer({
config: {
host: 'http://localhost:8081/'
},
schemas: [
{
topic: 'test.topic,
key: join(__dirname, 'key-schema.avsc'),
value: join(__dirname, 'value-schema.avsc')
}
],
}),
},
]),
]
...
})
```
See the [e2e test](https://github.com/rob3000/nestjs-kafka/tree/master/test/e2e/app) for example.
## TODO
* Tests
* Add Avro schema to outgoing messages.
* Get avro schema for key, as we are only getting the value.
PRs Welcome :heart:
17 changes: 7 additions & 10 deletions kafka/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.1
image: wurstmeister/zookeeper:latest
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-kafka:5.5.1
image: wurstmeister/kafka:2.11-1.1.1
depends_on:
- zookeeper
ports:
Expand All @@ -20,13 +17,13 @@ services:
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_HOST_NAME: "broker"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_LISTENERS: "PLAINTEXT://:29092,PLAINTEXT_HOST://:9092"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_CREATE_TOPICS: "test-topic:1:1"

schema-registry:
image: confluentinc/cp-schema-registry:5.5.1
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"test": "jest",
"lint": "eslint \"{src,apps,libs,test}/**/*.ts\" --fix",
"kafka:start": "docker-compose -f ./kafka/docker-compose.yml up -d",
"kafka:stop": "docker-compose -f ./kafka/docker-compose.yml down",
"kafka:stop": "docker-compose -f ./kafka/docker-compose.yml down -v",
"test:watch": "jest --watch",
"test:cov": "jest --coverage",
"test:debug": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest --runInBand",
Expand Down
10 changes: 5 additions & 5 deletions src/serializer/avro-request.serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ export class KafkaAvroRequestSerializer
const valueSchema = readAVSC(obj.value);
// const valueSubject = valueSchema.namespace + this.separator + valueSchema.name;

const a = {
const schemaObject = {
key: keySchema,
value: valueSchema
}

this.schemas.set(obj.topic, a);
this.schemas.set(obj.topic, schemaObject);
});

}
Expand All @@ -51,11 +51,11 @@ export class KafkaAvroRequestSerializer
const outgoingMessage = value;

try {
const schemas = this.schemas.get(value.topic);
//const schemas = this.schemas.get(value.topic);

// @todo - need to work out a way to better get the schema based on topic.
const keyId = await this.registry.getLatestSchemaId(value.topic + '-Key')
const valueId = await this.registry.getLatestSchemaId(value.topic + '-Value')
const keyId = await this.registry.getLatestSchemaId(value.topic + '-key')
const valueId = await this.registry.getLatestSchemaId(value.topic + '-value')

This comment has been minimized.

Copy link
@matheusisquierdo

matheusisquierdo Nov 5, 2020

Contributor

Nice!


const messages: Promise<KafkaMessageObject>[] = value.messages.map(async(origMessage) => {
const encodedValue = await this.registry.encode(valueId, origMessage.value)
Expand Down
36 changes: 21 additions & 15 deletions test/app.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,26 @@ import AppModule from './e2e/app/config.app';
import { TestConsumer, TOPIC_NAME } from './e2e/app/test.controller';
import { SchemaRegistry, readAVSC } from "@kafkajs/confluent-schema-registry";

describe('Setup for E2E', () => {
it('We can send schemas to schema registry', async() => {
const registry = new SchemaRegistry({ host: 'http://localhost:8081/' })

// For our other tests we require the schema to already exist
// in schema registry and dont allow uploaded through the nestJS
// application.
const valuePath = join(__dirname, 'e2e', 'app', 'value-schema.avsc');
const keyPath = join(__dirname, 'e2e', 'app', 'key-schema.avsc');
const valueSchema = readAVSC(valuePath);
const keySchema = readAVSC(keyPath);

const valueSchemaResult = await registry.register(valueSchema, { separator: '-' });
const keySchemaResult = await registry.register(keySchema, { separator: '-' });

expect(valueSchemaResult).toEqual({id: 1})
expect(keySchemaResult).toEqual({id: 2})
})
})

describe('AppModule (e2e)', () => {
const messages = [
{
Expand Down Expand Up @@ -39,7 +59,6 @@ describe('AppModule (e2e)', () => {

let app: INestMicroservice;
let controller: TestConsumer;
let registry: SchemaRegistry;

beforeAll(async () => {
const moduleFixture: TestingModule = await Test.createTestingModule({
Expand All @@ -51,7 +70,7 @@ describe('AppModule (e2e)', () => {
await app.listenAsync();

controller = await app.resolve(TestConsumer);
registry = new SchemaRegistry({ host: 'http://localhost:8081/' })

});

afterAll(async() => {
Expand All @@ -62,19 +81,6 @@ describe('AppModule (e2e)', () => {
setTimeout(done, 4000);
});

it('We can send schemas to schema registry', async() => {
// For our other tests we require the schema to already exist
// in schema registry and dont allow uploaded through the nestJS
// application.
const valuePath = join(__dirname, 'e2e', 'app', 'value-schema.avsc');
const keyPath = join(__dirname, 'e2e', 'app', 'key-schema.avsc');
const valueSchema = readAVSC(valuePath);
const keySchema = readAVSC(keyPath);

await registry.register(valueSchema, { separator: '-' })
await registry.register(keySchema, { separator: '-' })
})

it('We can SEND and ACCEPT AVRO messages', async (done) => {
await controller.sendMessage({ messages })
expect(controller.messages.length).toBe(messages.length);
Expand Down
7 changes: 3 additions & 4 deletions test/e2e/app/config.app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { Module } from '@nestjs/common';
import { KafkaModule, KafkaAvroResponseDeserializer, KafkaAvroRequestSerializer } from "../../../src";
import { TOPIC_NAME, TestConsumer } from "./test.controller";


@Module({
imports: [
KafkaModule.register([
Expand All @@ -14,8 +13,8 @@ import { TOPIC_NAME, TestConsumer } from "./test.controller";
clientId: 'test-e2e',
brokers: ['localhost:9092'],
retry: {
retries: 0,
initialRetryTime: 1,
retries: 2,
initialRetryTime: 30,
},
},
consumer: {
Expand All @@ -31,7 +30,7 @@ import { TOPIC_NAME, TestConsumer } from "./test.controller";
schemas: [
{
topic: TOPIC_NAME,
// key: join(__dirname, 'key-schema.avsc'),
key: join(__dirname, 'key-schema.avsc'),
value: join(__dirname, 'value-schema.avsc')
}
],
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/app/key-schema.avsc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "Key",
"name": "key",
"namespace": "test.topic",
"type": "record",
"fields": [
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/app/test.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Payload } from "@nestjs/microservices";
import { SubscribeTo, KafkaService } from '../../../src';
import { KafkaMessageSend } from '../../../src/interfaces';

export const TOPIC_NAME = 'test.topic';
export const TOPIC_NAME = 'test-topic';

export class TestConsumer {

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/app/value-schema.avsc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "Value",
"name": "value",
"namespace": "test.topic",
"type": "record",
"fields": [
Expand Down

0 comments on commit 5ea3ceb

Please sign in to comment.