Skip to content

Commit a248510

Browse files
authored
Refactor recordsize logic (#108)
* Abstract the record size logic * Fix Kafka Dry run * Make the recordSizePayload column nullable
1 parent 0dbb72e commit a248510

File tree

7 files changed

+43
-33
lines changed

7 files changed

+43
-33
lines changed

.github/workflows/integration.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ jobs:
5252
run: docker exec datagen datagen -s /tests/products.sql -f postgres -n 3
5353

5454
- name: Produce data to Postgres with multiple tables
55-
run: docker exec datagen datagen -s /tests/schema2.sql -f postgres -n 3
55+
run: docker exec datagen datagen -s /tests/schema2.sql -f postgres -n 3 -rs 1000
5656

5757
- name: Docker Compose Down
5858
run: docker compose down -v

src/kafkaDataGenerator.ts

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import alert from 'cli-alerts';
2-
import recordSize from './utils/recordSize.js';
32
import { KafkaProducer } from './kafka/producer.js';
43
import { generateMegaRecord } from './schemas/generateMegaRecord.js';
54
import { OutputFormat } from './formats/outputFormat.js';
@@ -20,11 +19,6 @@ export default async function kafkaDataGenerator({
2019
initialSchema: string;
2120
}): Promise<void> {
2221

23-
let payload: string;
24-
if (global.recordSize) {
25-
payload = await recordSize();
26-
}
27-
2822
let producer: KafkaProducer | null = null;
2923
if (global.dryRun !== true) {
3024
let outputFormat: OutputFormat;
@@ -55,16 +49,13 @@ export default async function kafkaDataGenerator({
5549
key = record[megaRecord[topic].key];
5650
}
5751

58-
if (global.recordSize) {
59-
record.recordSizePayload = payload;
60-
}
61-
6252
if (global.dryRun) {
6353
alert({
6454
type: `success`,
6555
name: `Dry run: Skipping record production...`,
6656
msg: `\n Topic: ${topic} \n Record key: ${key} \n Payload: ${JSON.stringify(record)}`
6757
});
58+
continue;
6859
}
6960

7061
await producer?.send(key, record, topic);

src/postgres/createTables.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,21 @@ export default async function createTables(schema: any, initialSchemaPath: strin
5454
const queries = initialSchema.split(';');
5555

5656
for (const query of queries) {
57+
let extendedQuery = query.trim();
58+
// Add ; to the end of the query if it's missing
59+
if (!extendedQuery.endsWith(';')) {
60+
extendedQuery += ';';
61+
}
62+
// If the global option is enabled, add the recordSizePayload column to the table creation query
63+
if (global.recordSize && extendedQuery.toLowerCase().startsWith('create table')) {
64+
extendedQuery = extendedQuery.replace(/\);/g, ', recordSizePayload TEXT NULL);');
65+
}
66+
5767
try {
58-
if (query.trim()) {
59-
const correctedSql = query.replace(/`/g, '"').replace(/COMMENT '.*'/g, '').replace(/datetime/g, 'timestamp');
68+
if (extendedQuery) {
69+
const correctedSql = extendedQuery.replace(/`/g, '"').replace(/COMMENT '.*'/g, '').replace(/datetime/g, 'timestamp');
6070
await client.query(correctedSql);
71+
console.log(correctedSql);
6172
}
6273
} catch (error) {
6374
alert({

src/postgresDataGenerator.ts

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ export default async function postgresDataGenerator({
1717
iterations: number;
1818
initialSchema: string;
1919
}): Promise<void> {
20-
2120
// Database client setup
2221
let client = null;
2322
if (global.dryRun) {
@@ -49,38 +48,40 @@ export default async function postgresDataGenerator({
4948
name: `Creating tables...`,
5049
msg: ``
5150
});
52-
client && await createTables(schema, initialSchema);
51+
client && (await createTables(schema, initialSchema));
5352
}
5453
}
5554

5655
for (const table in megaRecord) {
5756
for await (const record of megaRecord[table].records) {
58-
console.log(`\n Table: ${table} \n Record: ${JSON.stringify(record)}`);
57+
console.log(
58+
`\n Table: ${table} \n Record: ${JSON.stringify(record)}`
59+
);
5960

6061
let key = null;
6162
if (record[megaRecord[table].key]) {
6263
key = record[megaRecord[table].key];
6364
}
6465

65-
if (global.recordSize) {
66-
record.recordSizePayload = payload;
67-
}
68-
6966
if (global.dryRun) {
7067
alert({
7168
type: `success`,
7269
name: `Dry run: Skipping record production...`,
73-
msg: `\n Table: ${table} \n Record key: ${key} \n Payload: ${JSON.stringify(record)}`
70+
msg: `\n Table: ${table} \n Record key: ${key} \n Payload: ${JSON.stringify(
71+
record
72+
)}`
7473
});
7574
}
7675

7776
// Insert record into Postgres
7877
if (!global.dryRun) {
7978
try {
8079
const values = Object.values(record);
81-
const placeholders = values.map((_, index) => `$${index + 1}`).join(', ');
80+
const placeholders = values
81+
.map((_, index) => `$${index + 1}`)
82+
.join(', ');
8283
const query = `INSERT INTO ${table} VALUES (${placeholders})`;
83-
client && await client.query(query, values);
84+
client && (await client.query(query, values));
8485
} catch (err) {
8586
console.error(err);
8687
}
@@ -91,5 +92,5 @@ export default async function postgresDataGenerator({
9192
await sleep(global.wait);
9293
}
9394

94-
client && await client.end();
95+
client && (await client.end());
9596
}

src/schemas/generateMegaRecord.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { faker } from '@faker-js/faker';
22
import alert from 'cli-alerts';
3+
import recordSize from '../utils/recordSize.js';
34

45
export async function generateRandomRecord(fakerRecord: any, generatedRecord: any = {}) {
56
// helper function to generate a record from json schema with faker data
@@ -137,5 +138,15 @@ export async function generateMegaRecord(schema: any) {
137138
existingRecord = await generateRandomRecord(fakerRecord, existingRecord);
138139
}
139140
}
141+
142+
if (global.recordSize) {
143+
for (const topic in megaRecord) {
144+
let payload: string = await recordSize();
145+
for (let record of megaRecord[topic].records) {
146+
record["recordSizePayload"] = payload;
147+
}
148+
}
149+
}
150+
140151
return megaRecord;
141152
}

src/webhookDataGenerator.ts

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import alert from 'cli-alerts';
22
import { generateMegaRecord } from './schemas/generateMegaRecord.js';
33
import { OutputFormat } from './formats/outputFormat.js';
44
import sleep from './utils/sleep.js';
5-
import recordSize from './utils/recordSize.js';
65
import asyncGenerator from './utils/asyncGenerator.js';
76
import webhookConfig from './webhook/webhookConfig.js';
87

@@ -36,11 +35,6 @@ export default async function webhookDataGenerator({
3635
client = await webhookConfig();
3736
}
3837

39-
let payload: string;
40-
if (global.recordSize) {
41-
payload = await recordSize();
42-
}
43-
4438
for await (const iteration of asyncGenerator(iterations)) {
4539
global.iterationIndex = iteration;
4640
const megaRecord = await generateMegaRecord(schema);
@@ -58,9 +52,6 @@ export default async function webhookDataGenerator({
5852
const handler = async (megaRecord: any, iteration: number) => {
5953
for (const endpoint in megaRecord) {
6054
for await (const record of megaRecord[endpoint].records) {
61-
if (global.recordSize) {
62-
record.recordSizePayload = payload;
63-
}
6455

6556
if (global.dryRun) {
6657
alert({

tests/datagen.test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ describe('Test record size', () => {
7777
const output = datagen(`-s ${schema} -n 2 -rs 100`);
7878
expect(output).toContain('recordSizePayload');
7979
});
80+
test('should contain the recordSizePayload if record size is set with Postgres destinations', () => {
81+
const schema = './tests/products.sql';
82+
const output = datagen(`-s ${schema} -f postgres -n 2 -rs 100`);
83+
expect(output).toContain('recordSizePayload');
84+
});
8085
});
8186

8287
describe('Test sql output', () => {

0 commit comments

Comments
 (0)