Skip to content

Commit c47488d

Browse files
shrouti1507SankeerthSai Sankeerth
authored
fix: adding transformer proxy for iterable (#3878)
* fix: adding transformer proxy for iterable * fix: adding component test cases * fix: adding factory pattern * fix: code redesigning * fix: decoupling networkHandler and specific strategy * fix: simplifying logic * fix: convert to registry pattern * fix: converting to typescript * fix: removing unnecessary comments * fix: adding data delivery test cases for all endpoints * chore: improve iterable network handler (#3918) * chore: improve iterable network handler * chore: add comment in principal strategy class * chore: rename from PrincipalStrategy to BaseStrategy * chore: update expect-error comment --------- Co-authored-by: Sai Sankeerth <[email protected]> * fix: review comments addressed * fix: small refactoring * fix: update for supporting disallowed events * fix: code review suggestion Co-authored-by: Sankeerth <[email protected]> * fix: fixing test cases * fix: review comment addressed * fix: adding type definitions * fix: separating handle error functions * fix: migrating processor and router test cases * fix: review comment addressed --------- Co-authored-by: Sankeerth <[email protected]> Co-authored-by: Sai Sankeerth <[email protected]>
1 parent 30377b8 commit c47488d

File tree

19 files changed

+4762
-982
lines changed

19 files changed

+4762
-982
lines changed

src/v0/destinations/iterable/config.js

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,16 +76,39 @@ const constructEndpoint = (dataCenter, category) => {
7676
return `${baseUrl}${category.endpoint}`;
7777
};
7878

79+
const BULK_ENDPOINTS = ['/api/users/bulkUpdate', '/api/events/trackBulk'];
80+
7981
const IDENTIFY_MAX_BATCH_SIZE = 1000;
8082
const IDENTIFY_MAX_BODY_SIZE_IN_BYTES = 4000000;
8183

8284
const TRACK_MAX_BATCH_SIZE = 8000;
8385

86+
const ITERABLE_RESPONSE_USER_ID_PATHS = [
87+
'invalidUserIds',
88+
'failedUpdates.invalidUserIds',
89+
'failedUpdates.notFoundUserIds',
90+
'failedUpdates.forgottenUserIds',
91+
'failedUpdates.conflictUserIds',
92+
'failedUpdates.invalidDataUserIds',
93+
];
94+
95+
const ITERABLE_RESPONSE_EMAIL_PATHS = [
96+
'invalidEmails',
97+
'failedUpdates.invalidEmails',
98+
'failedUpdates.notFoundEmails',
99+
'failedUpdates.forgottenEmails',
100+
'failedUpdates.conflictEmails',
101+
'failedUpdates.invalidDataEmails',
102+
];
103+
84104
module.exports = {
85105
mappingConfig,
86106
ConfigCategory,
87107
constructEndpoint,
88108
TRACK_MAX_BATCH_SIZE,
89109
IDENTIFY_MAX_BATCH_SIZE,
90110
IDENTIFY_MAX_BODY_SIZE_IN_BYTES,
111+
ITERABLE_RESPONSE_USER_ID_PATHS,
112+
ITERABLE_RESPONSE_EMAIL_PATHS,
113+
BULK_ENDPOINTS,
91114
};

src/v0/destinations/iterable/util.js

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,7 @@ const batchUpdateUserEvents = (updateUserEvents, registerDeviceOrBrowserTokenEve
483483

484484
/**
485485
* Processes chunks of catalog events, extracts the necessary data, and prepares batched requests for further processing
486+
* ref : https://api.iterable.com/api/docs#catalogs_bulkUpdateCatalogItems
486487
* @param {*} catalogEventsChunks
487488
* @returns
488489
*/
@@ -600,12 +601,12 @@ const batchTrackEvents = (trackEvents) => {
600601
*/
601602
const prepareBatchRequests = (filteredEvents) => {
602603
const {
603-
trackEvents,
604-
catalogEvents,
605-
errorRespList,
606-
updateUserEvents,
607-
eventResponseList,
608-
registerDeviceOrBrowserTokenEvents,
604+
trackEvents, // track
605+
catalogEvents, // identify
606+
errorRespList, // track
607+
updateUserEvents, // identify
608+
eventResponseList, // track
609+
registerDeviceOrBrowserTokenEvents, // identify
609610
} = filteredEvents;
610611

611612
const updateUserBatchedResponseList =

src/v0/destinations/iterable/util.test.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ const {
88
registerDeviceTokenEventPayloadBuilder,
99
registerBrowserTokenEventPayloadBuilder,
1010
} = require('./util');
11-
1211
const { ConfigCategory } = require('./config');
1312

1413
const getTestMessage = () => {
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import { prepareProxyRequest, proxyRequest } from '../../../adapters/network';
2+
import { processAxiosResponse } from '../../../adapters/utils/networkUtils';
3+
import { BULK_ENDPOINTS } from '../../../v0/destinations/iterable/config';
4+
import { GenericStrategy } from './strategies/generic';
5+
import { TrackIdentifyStrategy } from './strategies/track-identify';
6+
import { GenericProxyHandlerInput } from './types';
7+
8+
const strategyRegistry: { [key: string]: any } = {
9+
[TrackIdentifyStrategy.name]: new TrackIdentifyStrategy(),
10+
[GenericStrategy.name]: new GenericStrategy(),
11+
};
12+
13+
const getResponseStrategy = (endpoint: string) => {
14+
if (BULK_ENDPOINTS.some((path) => endpoint.includes(path))) {
15+
return strategyRegistry[TrackIdentifyStrategy.name];
16+
}
17+
return strategyRegistry[GenericStrategy.name];
18+
};
19+
20+
const responseHandler = (responseParams: GenericProxyHandlerInput) => {
21+
const { destinationRequest } = responseParams;
22+
const strategy = getResponseStrategy(destinationRequest.endpoint);
23+
return strategy.handleResponse(responseParams);
24+
};
25+
26+
function networkHandler(this: any) {
27+
this.prepareProxy = prepareProxyRequest;
28+
this.proxy = proxyRequest;
29+
this.processAxiosResponse = processAxiosResponse;
30+
this.responseHandler = responseHandler;
31+
}
32+
33+
export { networkHandler };
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { isHttpStatusSuccess } from '../../../../v0/util';
2+
import { GenericProxyHandlerInput } from '../types';
3+
4+
// Base strategy is the base class for all strategies in Iterable destination
5+
abstract class BaseStrategy {
6+
handleResponse(responseParams: GenericProxyHandlerInput): void {
7+
const { destinationResponse } = responseParams;
8+
const { status } = destinationResponse;
9+
10+
if (!isHttpStatusSuccess(status)) {
11+
return this.handleError(responseParams);
12+
}
13+
14+
return this.handleSuccess(responseParams);
15+
}
16+
17+
abstract handleError(responseParams: GenericProxyHandlerInput): void;
18+
19+
abstract handleSuccess(responseParams: any): void;
20+
}
21+
22+
export { BaseStrategy };
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import { BaseStrategy } from './base';
2+
import {
3+
GenericProxyHandlerInput,
4+
IterableBulkApiResponse,
5+
IterableSuccessResponse,
6+
} from '../types';
7+
import { ProxyMetdata } from '../../../../types';
8+
import { TransformerProxyError } from '../../../../v0/util/errorTypes';
9+
import { TAG_NAMES } from '../../../../v0/util/tags';
10+
import { getDynamicErrorType } from '../../../../adapters/utils/networkUtils';
11+
12+
class GenericStrategy extends BaseStrategy {
13+
handleSuccess(responseParams: {
14+
destinationResponse: IterableBulkApiResponse;
15+
rudderJobMetadata: ProxyMetdata[];
16+
}): IterableSuccessResponse {
17+
const { destinationResponse, rudderJobMetadata } = responseParams;
18+
const { status } = destinationResponse;
19+
20+
const responseWithIndividualEvents = rudderJobMetadata.map((metadata) => ({
21+
statusCode: status,
22+
metadata,
23+
error: 'success',
24+
}));
25+
26+
return {
27+
status,
28+
message: '[ITERABLE Response Handler] - Request Processed Successfully',
29+
destinationResponse,
30+
response: responseWithIndividualEvents,
31+
};
32+
}
33+
34+
handleError(responseParams: GenericProxyHandlerInput): void {
35+
const { destinationResponse, rudderJobMetadata } = responseParams;
36+
const { response, status } = destinationResponse;
37+
const responseMessage = response.params || response.msg || response.message;
38+
const errorMessage = JSON.stringify(responseMessage) || 'unknown error format';
39+
40+
const responseWithIndividualEvents = rudderJobMetadata.map((metadata) => ({
41+
statusCode: status,
42+
metadata,
43+
error: errorMessage,
44+
}));
45+
46+
throw new TransformerProxyError(
47+
`ITERABLE: Error transformer proxy during ITERABLE response transformation. ${errorMessage}`,
48+
status,
49+
{ [TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status) },
50+
destinationResponse,
51+
'',
52+
responseWithIndividualEvents,
53+
);
54+
}
55+
}
56+
57+
export { GenericStrategy };
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import { BaseStrategy } from './base';
2+
import { GenericProxyHandlerInput, IterableBulkProxyInput } from '../types';
3+
import { checkIfEventIsAbortableAndExtractErrorMessage } from '../utils';
4+
import { DeliveryJobState, DeliveryV1Response } from '../../../../types';
5+
import { TransformerProxyError } from '../../../../v0/util/errorTypes';
6+
import { getDynamicErrorType } from '../../../../adapters/utils/networkUtils';
7+
import { TAG_NAMES } from '../../../../v0/util/tags';
8+
9+
class TrackIdentifyStrategy extends BaseStrategy {
10+
handleSuccess(responseParams: IterableBulkProxyInput): DeliveryV1Response {
11+
const { destinationResponse, rudderJobMetadata, destinationRequest } = responseParams;
12+
const { status } = destinationResponse;
13+
const responseWithIndividualEvents: DeliveryJobState[] = [];
14+
15+
const { events, users } = destinationRequest?.body.JSON || {};
16+
const finalData = events || users;
17+
18+
if (finalData) {
19+
finalData.forEach((event, idx) => {
20+
const parsedOutput = {
21+
statusCode: 200,
22+
metadata: rudderJobMetadata[idx],
23+
error: 'success',
24+
};
25+
26+
const { isAbortable, errorMsg } = checkIfEventIsAbortableAndExtractErrorMessage(
27+
event,
28+
destinationResponse,
29+
);
30+
if (isAbortable) {
31+
parsedOutput.statusCode = 400;
32+
parsedOutput.error = errorMsg;
33+
}
34+
responseWithIndividualEvents.push(parsedOutput);
35+
});
36+
}
37+
38+
return {
39+
status,
40+
message: '[ITERABLE Response Handler] - Request Processed Successfully',
41+
destinationResponse,
42+
response: responseWithIndividualEvents,
43+
};
44+
}
45+
46+
handleError(responseParams: GenericProxyHandlerInput): void {
47+
const { destinationResponse, rudderJobMetadata } = responseParams;
48+
const { response, status } = destinationResponse;
49+
const responseMessage = response.params || response.msg || response.message;
50+
const errorMessage = JSON.stringify(responseMessage) || 'unknown error format';
51+
52+
const responseWithIndividualEvents = rudderJobMetadata.map((metadata) => ({
53+
statusCode: status,
54+
metadata,
55+
error: errorMessage,
56+
}));
57+
58+
throw new TransformerProxyError(
59+
`ITERABLE: Error transformer proxy during ITERABLE response transformation. ${errorMessage}`,
60+
status,
61+
{ [TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status) },
62+
destinationResponse,
63+
'',
64+
responseWithIndividualEvents,
65+
);
66+
}
67+
}
68+
69+
export { TrackIdentifyStrategy };
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import { ProxyMetdata, ProxyV1Request } from '../../../types';
2+
3+
type FailedUpdates = {
4+
invalidEmails?: string[];
5+
invalidUserIds?: string[];
6+
notFoundEmails?: string[];
7+
notFoundUserIds?: string[];
8+
invalidDataEmails?: string[];
9+
invalidDataUserIds?: string[];
10+
conflictEmails?: string[];
11+
conflictUserIds?: string[];
12+
forgottenEmails?: string[];
13+
forgottenUserIds?: string[];
14+
};
15+
16+
export type GeneralApiResponse = {
17+
msg?: string;
18+
code?: string;
19+
params?: Record<string, unknown>;
20+
successCount?: number;
21+
failCount?: number;
22+
invalidEmails?: string[];
23+
invalidUserIds?: string[];
24+
filteredOutFields?: string[];
25+
createdFields?: string[];
26+
disallowedEventNames?: string[];
27+
failedUpdates?: FailedUpdates;
28+
};
29+
30+
export type IterableBulkApiResponse = {
31+
status: number;
32+
response: GeneralApiResponse;
33+
};
34+
35+
type IterableBulkRequestBody = {
36+
events?: any[];
37+
users?: any[];
38+
};
39+
40+
export type IterableBulkProxyInput = {
41+
destinationResponse: IterableBulkApiResponse;
42+
rudderJobMetadata: ProxyMetdata[];
43+
destType: string;
44+
destinationRequest?: {
45+
body: {
46+
JSON: IterableBulkRequestBody;
47+
};
48+
};
49+
};
50+
51+
export type GenericProxyHandlerInput = {
52+
destinationResponse: any;
53+
rudderJobMetadata: ProxyMetdata[];
54+
destType: string;
55+
destinationRequest: ProxyV1Request;
56+
};
57+
58+
export type Response = {
59+
statusCode: number;
60+
metadata: any;
61+
error: string;
62+
};
63+
64+
export type IterableSuccessResponse = {
65+
status: number;
66+
message: string;
67+
destinationResponse: IterableBulkApiResponse;
68+
response: Response[];
69+
};

0 commit comments

Comments
 (0)