Skip to content

Commit f8cd6bd

Browse files
authored
feat: add support for headers to source transformation flows (#3683)
* feat: add support for headers to source transformation flows * chore: add types for source trasform event
1 parent b1d0d08 commit f8cd6bd

File tree

11 files changed

+74
-29
lines changed

11 files changed

+74
-29
lines changed

src/controllers/source.ts

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ export class SourceController {
1818
version,
1919
events,
2020
);
21+
2122
const resplist = await integrationService.sourceTransformRoutine(
2223
input,
2324
source,

src/services/source/__tests__/nativeIntegration.test.ts

+7-6
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,23 @@ afterEach(() => {
88
jest.clearAllMocks();
99
});
1010

11+
const headers = {
12+
'x-rudderstack-source': 'test',
13+
};
14+
1115
describe('NativeIntegration Source Service', () => {
1216
test('sourceTransformRoutine - success', async () => {
1317
const sourceType = '__rudder_test__';
1418
const version = 'v0';
1519
const requestMetadata = {};
1620

17-
const event = { message: { a: 'b' } };
21+
const event = { message: { a: 'b' }, headers };
1822
const events = [event, event];
1923

20-
const tevent = { anonymousId: 'test' } as RudderMessage;
24+
const tevent = { anonymousId: 'test', context: { headers } } as RudderMessage;
2125
const tresp = { output: { batch: [tevent] }, statusCode: 200 } as SourceTransformationResponse;
2226

23-
const tresponse = [
24-
{ output: { batch: [{ anonymousId: 'test' }] }, statusCode: 200 },
25-
{ output: { batch: [{ anonymousId: 'test' }] }, statusCode: 200 },
26-
];
27+
const tresponse = [tresp, tresp];
2728

2829
FetchHandler.getSourceHandler = jest.fn().mockImplementationOnce((d, v) => {
2930
expect(d).toEqual(sourceType);

src/services/source/__tests__/postTransformation.test.ts

+19-7
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import {
55
} from '../../../types/index';
66
import { SourcePostTransformationService } from '../../source/postTransformation';
77

8+
const headers = {
9+
'x-rudderstack-source': 'test',
10+
};
11+
812
describe('Source PostTransformation Service', () => {
913
test('should handleFailureEventsSource', async () => {
1014
const e = new Error('test error');
@@ -26,24 +30,32 @@ describe('Source PostTransformation Service', () => {
2630
output: { batch: [{ anonymousId: 'test' }] },
2731
} as SourceTransformationResponse;
2832

29-
const result = SourcePostTransformationService.handleSuccessEventsSource(event);
33+
const postProcessedEvents = {
34+
outputToSource: {},
35+
output: { batch: [{ anonymousId: 'test', context: { headers } }] },
36+
} as SourceTransformationResponse;
3037

31-
expect(result).toEqual(event);
38+
const result = SourcePostTransformationService.handleSuccessEventsSource(event, { headers });
39+
40+
expect(result).toEqual(postProcessedEvents);
3241
});
3342

3443
test('should return the events as batch in SourceTransformationResponse if it is an array', () => {
44+
const headers = {
45+
'x-rudderstack-source': 'test',
46+
};
3547
const events = [{ anonymousId: 'test' }, { anonymousId: 'test' }] as RudderMessage[];
48+
const postProcessedEvents = events.map((event) => ({ ...event, context: { headers } }));
49+
const result = SourcePostTransformationService.handleSuccessEventsSource(events, { headers });
3650

37-
const result = SourcePostTransformationService.handleSuccessEventsSource(events);
38-
39-
expect(result).toEqual({ output: { batch: events } });
51+
expect(result).toEqual({ output: { batch: postProcessedEvents } });
4052
});
4153

4254
test('should return the event as batch in SourceTransformationResponse if it is a single object', () => {
4355
const event = { anonymousId: 'test' } as RudderMessage;
4456

45-
const result = SourcePostTransformationService.handleSuccessEventsSource(event);
57+
const result = SourcePostTransformationService.handleSuccessEventsSource(event, { headers });
4658

47-
expect(result).toEqual({ output: { batch: [event] } });
59+
expect(result).toEqual({ output: { batch: [{ ...event, context: { headers } }] } });
4860
});
4961
});

src/services/source/nativeIntegration.ts

+7-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
ErrorDetailer,
55
MetaTransferObject,
66
RudderMessage,
7+
SourceTransformationEvent,
78
SourceTransformationResponse,
89
} from '../../types/index';
910
import stats from '../../util/stats';
@@ -27,7 +28,7 @@ export class NativeIntegrationSourceService implements SourceService {
2728
}
2829

2930
public async sourceTransformRoutine(
30-
sourceEvents: NonNullable<unknown>[],
31+
sourceEvents: NonNullable<SourceTransformationEvent>[],
3132
sourceType: string,
3233
version: string,
3334
// eslint-disable-next-line @typescript-eslint/no-unused-vars
@@ -38,9 +39,12 @@ export class NativeIntegrationSourceService implements SourceService {
3839
const respList: SourceTransformationResponse[] = await Promise.all<FixMe>(
3940
sourceEvents.map(async (sourceEvent) => {
4041
try {
42+
const newSourceEvent = sourceEvent;
43+
const { headers } = newSourceEvent;
44+
delete newSourceEvent.headers;
4145
const respEvents: RudderMessage | RudderMessage[] | SourceTransformationResponse =
42-
await sourceHandler.process(sourceEvent);
43-
return SourcePostTransformationService.handleSuccessEventsSource(respEvents);
46+
await sourceHandler.process(newSourceEvent);
47+
return SourcePostTransformationService.handleSuccessEventsSource(respEvents, { headers });
4448
} catch (error: FixMe) {
4549
stats.increment('source_transform_errors', {
4650
source: sourceType,

src/services/source/postTransformation.ts

+15-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { MetaTransferObject, RudderMessage, SourceTransformationResponse } from '../../types/index';
2+
import { CommonUtils } from '../../util/common';
23
import { CatchErr } from '../../util/types';
34
import { generateErrorObject } from '../../v0/util';
45
import { ErrorReportingService } from '../errorReporting';
@@ -20,16 +21,25 @@ export class SourcePostTransformationService {
2021

2122
public static handleSuccessEventsSource(
2223
events: RudderMessage | RudderMessage[] | SourceTransformationResponse,
24+
context: { headers?: Record<string, string> },
2325
): SourceTransformationResponse {
2426
// We send response back to the source
2527
// through outputToSource. This is not sent to gateway
2628
// We will not return array for events not meant for gateway
27-
if (Object.prototype.hasOwnProperty.call(events, 'outputToSource')) {
28-
return events as SourceTransformationResponse;
29+
let sourceTransformationResponse = events as SourceTransformationResponse;
30+
if (!Object.prototype.hasOwnProperty.call(events, 'outputToSource')) {
31+
const eventsBatch = CommonUtils.toArray(events);
32+
sourceTransformationResponse = {
33+
output: { batch: eventsBatch },
34+
} as SourceTransformationResponse;
2935
}
30-
if (Array.isArray(events)) {
31-
return { output: { batch: events } } as SourceTransformationResponse;
36+
37+
if (sourceTransformationResponse.output) {
38+
sourceTransformationResponse.output.batch.forEach((event) => {
39+
const newEvent = event as RudderMessage;
40+
newEvent.context = { ...event.context, ...context };
41+
});
3242
}
33-
return { output: { batch: [events] } } as SourceTransformationResponse;
43+
return sourceTransformationResponse;
3444
}
3545
}

src/types/index.ts

+7
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,12 @@ type RouterTransformationResponse = {
185185
statTags?: object;
186186
};
187187

188+
type SourceTransformationEvent = {
189+
headers?: Record<string, string>;
190+
query_params?: Record<string, string>;
191+
[key: string]: any;
192+
};
193+
188194
type SourceTransformationOutput = {
189195
batch: RudderMessage[];
190196
};
@@ -360,6 +366,7 @@ export {
360366
RouterTransformationRequestData,
361367
RouterTransformationResponse,
362368
RudderMessage,
369+
SourceTransformationEvent,
363370
SourceTransformationResponse,
364371
UserDeletionRequest,
365372
UserDeletionResponse,

src/v0/util/index.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -1168,7 +1168,7 @@ const getDestinationExternalIDInfoForRetl = (message, destination) => {
11681168
if (externalIdArray) {
11691169
externalIdArray.forEach((extIdObj) => {
11701170
const { type, id } = extIdObj;
1171-
if (type && type.includes(`${destination}-`)) {
1171+
if (type?.includes(`${destination}-`)) {
11721172
destinationExternalId = id;
11731173
objectType = type.replace(`${destination}-`, '');
11741174
identifierType = extIdObj.identifierType;
@@ -1195,7 +1195,7 @@ const getDestinationExternalIDObjectForRetl = (message, destination) => {
11951195
// some stops the execution when the element is found
11961196
externalIdArray.some((extIdObj) => {
11971197
const { type } = extIdObj;
1198-
if (type && type.includes(`${destination}-`)) {
1198+
if (type?.includes(`${destination}-`)) {
11991199
obj = extIdObj;
12001200
return true;
12011201
}

test/apitests/data_scenarios/source/v1/successful.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
"fulfillment_id": "1234567890",
3030
"status": "pending"
3131
}
32-
}
32+
},
33+
"context": {}
3334
}
3435
]
3536
}

test/apitests/service.api.test.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -359,8 +359,8 @@ describe('Api tests with a mock source/destination', () => {
359359
.send(getData());
360360

361361
const expected = [
362-
{ output: { batch: [{ event: 'clicked', type: 'track' }] } },
363-
{ output: { batch: [{ event: 'clicked', type: 'track' }] } },
362+
{ output: { batch: [{ event: 'clicked', type: 'track', context: {} }] } },
363+
{ output: { batch: [{ event: 'clicked', type: 'track', context: {} }] } },
364364
];
365365

366366
expect(response.status).toEqual(200);
@@ -398,8 +398,8 @@ describe('Api tests with a mock source/destination', () => {
398398
.send(getData());
399399

400400
const expected = [
401-
{ output: { batch: [{ event: 'clicked', type: 'track' }] } },
402-
{ output: { batch: [{ event: 'clicked', type: 'track' }] } },
401+
{ output: { batch: [{ event: 'clicked', type: 'track', context: {} }] } },
402+
{ output: { batch: [{ event: 'clicked', type: 'track', context: {} }] } },
403403
];
404404

405405
expect(response.status).toEqual(200);

test/integrations/sources/pipedream/data.ts

+6-1
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ export const data = [
199199
{
200200
userId: '1',
201201
originalTimestamp: '2020-09-28T19:53:31.900Z',
202+
context: {},
202203
traits: {
203204
firstName: 'John',
204205
lastName: 'doe',
@@ -336,7 +337,11 @@ export const data = [
336337
status: 200,
337338
body: [
338339
{
339-
output: { batch: [{ type: 'alias', previousId: '[email protected]', userId: '12345' }] },
340+
output: {
341+
batch: [
342+
{ type: 'alias', previousId: '[email protected]', userId: '12345', context: {} },
343+
],
344+
},
340345
},
341346
],
342347
},

test/integrations/sources/segment/data.ts

+4
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ export const data: SrcTestCaseData[] = [
124124
elapsedTime: null,
125125
session_id: '**************_***************',
126126
},
127+
context: {},
127128
hostname: '************.us.auth0.com',
128129
user_id: 'auth0|************************',
129130
user_name: '[email protected]',
@@ -145,6 +146,7 @@ export const data: SrcTestCaseData[] = [
145146
{
146147
date: '2020-07-10T07:43:09.620Z',
147148
type: 'seacft',
149+
context: {},
148150
description: '',
149151
connection_id: '',
150152
client_id: '********************************',
@@ -176,6 +178,7 @@ export const data: SrcTestCaseData[] = [
176178
client_id: '********************************',
177179
client_name: 'My App',
178180
ip: '47.15.6.58',
181+
context: {},
179182
user_agent: 'Chrome Mobile 69.0.3497 / Android 0.0.0',
180183
details: {
181184
prompts: [],
@@ -207,6 +210,7 @@ export const data: SrcTestCaseData[] = [
207210
client_id: '********************************',
208211
client_name: 'My App',
209212
ip: '47.15.6.58',
213+
context: {},
210214
user_agent: 'Chrome Mobile 69.0.3497 / Android 0.0.0',
211215
details: {
212216
prompts: [],

0 commit comments

Comments
 (0)