Skip to content

Commit

Permalink
Adjusting E2E tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rob3000 committed Nov 6, 2020
1 parent 5af64fd commit 9fda0ae
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 34 deletions.
26 changes: 17 additions & 9 deletions kafka/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,36 @@
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper:latest
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
ports:
- "2181:2181"
- 2181:2181
environment:
ZOOKEEPER_CLIENT_PORT: '2181'
ZOOKEEPER_TICK_TIME: '2000'

broker:
image: wurstmeister/kafka:2.11-1.1.1
image: confluentinc/cp-kafka:5.4.2
hostname: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
- "9101:9101"
- 29092:29092
- 9092:9092
- 9101:9101
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_HOST_NAME: "broker"
# KAFKA_ADVERTISED_HOST_NAME: 'broker'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENERS: "PLAINTEXT://:29092,PLAINTEXT_HOST://:9092"
KAFKA_LISTENERS: 'PLAINTEXT://:29092,PLAINTEXT_HOST://:9092'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_CREATE_TOPICS: "test-topic:1:1"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

schema-registry:
image: confluentinc/cp-schema-registry:5.5.1
Expand Down
1 change: 1 addition & 0 deletions src/deserializer/avro-response.deserializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export class KafkaAvroResponseDeserializer
const msg = this.fallback.deserialize(message);
Object.assign(decodeResponse, msg);
}

return decodeResponse;
}

Expand Down
2 changes: 1 addition & 1 deletion src/kafka.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {

// @todo - rather than have a producerRecord,
// most of this can be done when we create the controller.
return this.producer.send(serializedPacket);
return await this.producer.send(serializedPacket);
}

/**
Expand Down
31 changes: 18 additions & 13 deletions src/serializer/avro-request.serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,21 @@ export class KafkaAvroRequestSerializer
this.registry = new SchemaRegistry(options.config);
// this.separator = options.schemaSeparator || '-';
let keySchema = null;
options.schemas.forEach((obj) => {
if (obj.key) {
keySchema = readAVSC(obj.key);
options.schemas.forEach((schema: KafkaAvroRequestSerializerSchema) => {
if (schema.key) {
keySchema = readAVSC(schema.key);
}
// test.topic-Value
const valueSchema = readAVSC(obj.value);
// const valueSubject = valueSchema.namespace + this.separator + valueSchema.name;


const valueSchema = readAVSC(schema.value);

const schemaObject = {
key: keySchema,
value: valueSchema,
keySuffix: obj.keySuffix??'Key',
valueSuffix: obj.valueSuffix??'Value',
keySuffix: schema.keySuffix ?? 'key',
valueSuffix: schema.valueSuffix ?? 'value',
}

this.schemas.set(obj.topic, schemaObject);
this.schemas.set(schema.topic, schemaObject);
});

}
Expand All @@ -62,10 +61,15 @@ export class KafkaAvroRequestSerializer
const keyId = await this.registry.getLatestSchemaId(`${value.topic}-${schema.keySuffix}`)
const valueId = await this.registry.getLatestSchemaId(`${value.topic}-${schema.valueSuffix}`)


const messages: Promise<KafkaMessageObject>[] = value.messages.map(async(origMessage) => {
const encodedValue = await this.registry.encode(valueId, origMessage.value)
const encodedKey = await this.registry.encode(keyId, origMessage.key)

let encodedKey = origMessage.key;
const encodedValue = await this.registry.encode(valueId, origMessage.value);

if (keyId) {
encodedKey = await this.registry.encode(keyId, origMessage.key);
}

return {
...origMessage,
value: encodedValue,
Expand All @@ -78,6 +82,7 @@ export class KafkaAvroRequestSerializer
} catch (e) {
this.logger.error(e);
}

return outgoingMessage;
}

Expand Down
17 changes: 8 additions & 9 deletions test/app.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Test, TestingModule } from '@nestjs/testing';
import { INestMicroservice } from '@nestjs/common';
import { join } from 'path';
import AppModule from './e2e/app/config.app';
import { TestConsumer, TOPIC_NAME } from './e2e/app/test.controller';
import { TestConsumer } from './e2e/app/test.controller';
import { SchemaRegistry, readAVSC } from "@kafkajs/confluent-schema-registry";

describe('Setup for E2E', () => {
Expand Down Expand Up @@ -70,7 +70,7 @@ describe('AppModule (e2e)', () => {
await app.listenAsync();

controller = await app.resolve(TestConsumer);

controller.messages = [];
});

afterAll(async() => {
Expand All @@ -81,13 +81,12 @@ describe('AppModule (e2e)', () => {
setTimeout(done, 4000);
});

it('We can SEND and ACCEPT AVRO messages', async (done) => {
await controller.sendMessage({ messages })
expect(controller.messages.length).toBe(messages.length);

expect(controller.messages[0]).toEqual(messages[0].value);
expect(controller.messages[1]).toEqual(messages[1].value);
it('We can SEND and ACCEPT AVRO messages', async () => {
return controller.sendMessage({ messages }).then(() => {
expect(controller.messages.length).toBe(messages.length);

done();
expect(controller.messages[0]).toEqual(messages[0].value);
expect(controller.messages[1]).toEqual(messages[1].value);
});
});
});
1 change: 1 addition & 0 deletions test/e2e/app/config.app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { TOPIC_NAME, TestConsumer } from "./test.controller";
},
consumer: {
groupId: 'test-e2e-consumer',
allowAutoTopicCreation: true,
},
deserializer: new KafkaAvroResponseDeserializer({
host: 'http://localhost:8081/'
Expand Down
3 changes: 1 addition & 2 deletions test/e2e/app/test.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Payload } from "@nestjs/microservices";
import { SubscribeTo, KafkaService } from '../../../src';
import { KafkaMessageSend } from '../../../src/interfaces';

export const TOPIC_NAME = 'test-topic';
export const TOPIC_NAME = 'test.topic';

export class TestConsumer {

Expand All @@ -30,7 +30,6 @@ export class TestConsumer {
...event,
topic: TOPIC_NAME
}

return await this.client.send(a);
}
}

0 comments on commit 9fda0ae

Please sign in to comment.