Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: adding transformer proxy for iterable #3878

Merged
merged 25 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
096d008
fix: adding transformer proxy for iterable
shrouti1507 Nov 14, 2024
2a10f70
Merge branch 'develop' into fix.iterable-networkhandler
shrouti1507 Nov 14, 2024
2b34182
fix: adding component test cases
shrouti1507 Nov 14, 2024
8742244
fix: adding factory pattern
shrouti1507 Nov 18, 2024
25a5ced
fix: code redesigning
shrouti1507 Nov 20, 2024
c87c5ce
fix: decoupling networkHandler and specific strategy
shrouti1507 Nov 20, 2024
1058ec1
fix: simplifying logic
shrouti1507 Nov 20, 2024
f11d7eb
Merge branch 'develop' into fix.iterable-networkhandler
shrouti1507 Dec 3, 2024
909cad0
fix: convert to registry pattern
shrouti1507 Dec 3, 2024
2337c7b
fix: converting to typescript
shrouti1507 Dec 3, 2024
9eb31bf
fix: removing unnecessary comments
shrouti1507 Dec 3, 2024
97d531f
fix: adding data delivery test cases for all endpoints
shrouti1507 Dec 6, 2024
3789652
chore: improve iterable network handler (#3918)
sanpj2292 Dec 7, 2024
aae7b3d
Merge branch 'develop' into fix.iterable-networkhandler
shrouti1507 Dec 12, 2024
9ee2847
fix: review comments addressed
shrouti1507 Dec 12, 2024
a5ba58c
fix: small refactoring
shrouti1507 Dec 12, 2024
d00f31f
fix: update for supporting disallowed events
shrouti1507 Dec 12, 2024
707dfe8
fix: code review suggestion
shrouti1507 Dec 16, 2024
8e5d6ab
fix: fixing test cases
shrouti1507 Dec 16, 2024
9ce4575
fix: review comment addressed
shrouti1507 Dec 16, 2024
de1bb5c
fix: adding type definitions
shrouti1507 Jan 3, 2025
85b1166
fix: separating handle error functions
shrouti1507 Jan 3, 2025
933473a
Merge branch 'develop' into fix.iterable-networkhandler
shrouti1507 Jan 3, 2025
ebfeffc
fix: migrating processor and router test cases
shrouti1507 Jan 6, 2025
4cc52d2
fix: review comment addressed
shrouti1507 Jan 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/v0/destinations/iterable/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,39 @@ const constructEndpoint = (dataCenter, category) => {
return `${baseUrl}${category.endpoint}`;
};

const BULK_ENDPOINTS = ['/api/users/bulkUpdate', '/api/events/trackBulk'];

const IDENTIFY_MAX_BATCH_SIZE = 1000;
const IDENTIFY_MAX_BODY_SIZE_IN_BYTES = 4000000;

const TRACK_MAX_BATCH_SIZE = 8000;

const ITERABLE_RESPONSE_USER_ID_PATHS = [
'invalidUserIds',
'failedUpdates.invalidUserIds',
'failedUpdates.notFoundUserIds',
'failedUpdates.forgottenUserIds',
'failedUpdates.conflictUserIds',
'failedUpdates.invalidDataUserIds',
];

const ITERABLE_RESPONSE_EMAIL_PATHS = [
'invalidEmails',
'failedUpdates.invalidEmails',
'failedUpdates.notFoundEmails',
'failedUpdates.forgottenEmails',
'failedUpdates.conflictEmails',
'failedUpdates.invalidDataEmails',
];

module.exports = {
mappingConfig,
ConfigCategory,
constructEndpoint,
TRACK_MAX_BATCH_SIZE,
IDENTIFY_MAX_BATCH_SIZE,
IDENTIFY_MAX_BODY_SIZE_IN_BYTES,
ITERABLE_RESPONSE_USER_ID_PATHS,
ITERABLE_RESPONSE_EMAIL_PATHS,
BULK_ENDPOINTS,
};
13 changes: 7 additions & 6 deletions src/v0/destinations/iterable/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ const batchUpdateUserEvents = (updateUserEvents, registerDeviceOrBrowserTokenEve

/**
* Processes chunks of catalog events, extracts the necessary data, and prepares batched requests for further processing
* ref : https://api.iterable.com/api/docs#catalogs_bulkUpdateCatalogItems
* @param {*} catalogEventsChunks
* @returns
*/
Expand Down Expand Up @@ -600,12 +601,12 @@ const batchTrackEvents = (trackEvents) => {
*/
const prepareBatchRequests = (filteredEvents) => {
const {
trackEvents,
catalogEvents,
errorRespList,
updateUserEvents,
eventResponseList,
registerDeviceOrBrowserTokenEvents,
trackEvents, // track
catalogEvents, // identify
errorRespList, // track
updateUserEvents, // identify
eventResponseList, // track
registerDeviceOrBrowserTokenEvents, // identify
} = filteredEvents;

const updateUserBatchedResponseList =
Expand Down
1 change: 0 additions & 1 deletion src/v0/destinations/iterable/util.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ const {
registerDeviceTokenEventPayloadBuilder,
registerBrowserTokenEventPayloadBuilder,
} = require('./util');

const { ConfigCategory } = require('./config');

const getTestMessage = () => {
Expand Down
33 changes: 33 additions & 0 deletions src/v1/destinations/iterable/networkHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { prepareProxyRequest, proxyRequest } from '../../../adapters/network';
import { processAxiosResponse } from '../../../adapters/utils/networkUtils';
import { BULK_ENDPOINTS } from '../../../v0/destinations/iterable/config';
import { GenericStrategy } from './strategies/generic';
import { TrackIdentifyStrategy } from './strategies/track-identify';
import { GenericProxyHandlerInput } from './types';

shrouti1507 marked this conversation as resolved.
Show resolved Hide resolved
const strategyRegistry: { [key: string]: any } = {
[TrackIdentifyStrategy.name]: new TrackIdentifyStrategy(),
[GenericStrategy.name]: new GenericStrategy(),
};

const getResponseStrategy = (endpoint: string) => {
if (BULK_ENDPOINTS.some((path) => endpoint.includes(path))) {
return strategyRegistry[TrackIdentifyStrategy.name];
}
return strategyRegistry[GenericStrategy.name];
};

const responseHandler = (responseParams: GenericProxyHandlerInput) => {
const { destinationRequest } = responseParams;
const strategy = getResponseStrategy(destinationRequest.endpoint);
return strategy.handleResponse(responseParams);
};

function networkHandler(this: any) {
this.prepareProxy = prepareProxyRequest;
this.proxy = proxyRequest;
this.processAxiosResponse = processAxiosResponse;
this.responseHandler = responseHandler;
}

export { networkHandler };
22 changes: 22 additions & 0 deletions src/v1/destinations/iterable/strategies/base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { isHttpStatusSuccess } from '../../../../v0/util';
import { GenericProxyHandlerInput } from '../types';

// Base strategy is the base class for all strategies in Iterable destination
abstract class BaseStrategy {
handleResponse(responseParams: GenericProxyHandlerInput): void {
const { destinationResponse } = responseParams;
const { status } = destinationResponse;

if (!isHttpStatusSuccess(status)) {
return this.handleError(responseParams);
}

return this.handleSuccess(responseParams);
}

abstract handleError(responseParams: GenericProxyHandlerInput): void;

abstract handleSuccess(responseParams: any): void;
}

export { BaseStrategy };
57 changes: 57 additions & 0 deletions src/v1/destinations/iterable/strategies/generic.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { BaseStrategy } from './base';
import {
GenericProxyHandlerInput,
IterableBulkApiResponse,
IterableSuccessResponse,
} from '../types';
import { ProxyMetdata } from '../../../../types';
import { TransformerProxyError } from '../../../../v0/util/errorTypes';
import { TAG_NAMES } from '../../../../v0/util/tags';
import { getDynamicErrorType } from '../../../../adapters/utils/networkUtils';

class GenericStrategy extends BaseStrategy {
handleSuccess(responseParams: {
destinationResponse: IterableBulkApiResponse;
rudderJobMetadata: ProxyMetdata[];
}): IterableSuccessResponse {
const { destinationResponse, rudderJobMetadata } = responseParams;
const { status } = destinationResponse;

const responseWithIndividualEvents = rudderJobMetadata.map((metadata) => ({
statusCode: status,
metadata,
error: 'success',
}));

return {
status,
message: '[ITERABLE Response Handler] - Request Processed Successfully',
destinationResponse,
response: responseWithIndividualEvents,
};
}

handleError(responseParams: GenericProxyHandlerInput): void {
const { destinationResponse, rudderJobMetadata } = responseParams;
const { response, status } = destinationResponse;
const responseMessage = response.params || response.msg || response.message;
const errorMessage = JSON.stringify(responseMessage) || 'unknown error format';

const responseWithIndividualEvents = rudderJobMetadata.map((metadata) => ({
statusCode: status,
metadata,
error: errorMessage,
}));

throw new TransformerProxyError(
`ITERABLE: Error transformer proxy during ITERABLE response transformation. ${errorMessage}`,
status,
{ [TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status) },
destinationResponse,
'',
responseWithIndividualEvents,
);
}
}

export { GenericStrategy };
69 changes: 69 additions & 0 deletions src/v1/destinations/iterable/strategies/track-identify.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { BaseStrategy } from './base';
import { GenericProxyHandlerInput, IterableBulkProxyInput } from '../types';
import { checkIfEventIsAbortableAndExtractErrorMessage } from '../utils';
import { DeliveryJobState, DeliveryV1Response } from '../../../../types';
import { TransformerProxyError } from '../../../../v0/util/errorTypes';
import { getDynamicErrorType } from '../../../../adapters/utils/networkUtils';
import { TAG_NAMES } from '../../../../v0/util/tags';

class TrackIdentifyStrategy extends BaseStrategy {
handleSuccess(responseParams: IterableBulkProxyInput): DeliveryV1Response {
const { destinationResponse, rudderJobMetadata, destinationRequest } = responseParams;
const { status } = destinationResponse;
const responseWithIndividualEvents: DeliveryJobState[] = [];

const { events, users } = destinationRequest?.body.JSON || {};
const finalData = events || users;

if (finalData) {
finalData.forEach((event, idx) => {
shrouti1507 marked this conversation as resolved.
Show resolved Hide resolved
const parsedOutput = {
statusCode: 200,
metadata: rudderJobMetadata[idx],
error: 'success',
};

const { isAbortable, errorMsg } = checkIfEventIsAbortableAndExtractErrorMessage(
event,
destinationResponse,
);
if (isAbortable) {
parsedOutput.statusCode = 400;
parsedOutput.error = errorMsg;
}
responseWithIndividualEvents.push(parsedOutput);
});
}

return {
status,
message: '[ITERABLE Response Handler] - Request Processed Successfully',
destinationResponse,
response: responseWithIndividualEvents,
};
}

handleError(responseParams: GenericProxyHandlerInput): void {
const { destinationResponse, rudderJobMetadata } = responseParams;
const { response, status } = destinationResponse;
const responseMessage = response.params || response.msg || response.message;
const errorMessage = JSON.stringify(responseMessage) || 'unknown error format';

const responseWithIndividualEvents = rudderJobMetadata.map((metadata) => ({
statusCode: status,
metadata,
error: errorMessage,
}));

throw new TransformerProxyError(
`ITERABLE: Error transformer proxy during ITERABLE response transformation. ${errorMessage}`,
status,
{ [TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status) },
destinationResponse,
'',
responseWithIndividualEvents,
);
}
}

export { TrackIdentifyStrategy };
69 changes: 69 additions & 0 deletions src/v1/destinations/iterable/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { ProxyMetdata, ProxyV1Request } from '../../../types';

type FailedUpdates = {
invalidEmails?: string[];
invalidUserIds?: string[];
notFoundEmails?: string[];
notFoundUserIds?: string[];
invalidDataEmails?: string[];
invalidDataUserIds?: string[];
conflictEmails?: string[];
conflictUserIds?: string[];
forgottenEmails?: string[];
forgottenUserIds?: string[];
};

export type GeneralApiResponse = {
msg?: string;
code?: string;
params?: Record<string, unknown>;
successCount?: number;
failCount?: number;
invalidEmails?: string[];
invalidUserIds?: string[];
filteredOutFields?: string[];
createdFields?: string[];
disallowedEventNames?: string[];
failedUpdates?: FailedUpdates;
};

export type IterableBulkApiResponse = {
status: number;
response: GeneralApiResponse;
};

type IterableBulkRequestBody = {
events?: any[];
users?: any[];
};

export type IterableBulkProxyInput = {
destinationResponse: IterableBulkApiResponse;
rudderJobMetadata: ProxyMetdata[];
destType: string;
destinationRequest?: {
body: {
JSON: IterableBulkRequestBody;
};
};
};

export type GenericProxyHandlerInput = {
destinationResponse: any;
rudderJobMetadata: ProxyMetdata[];
destType: string;
destinationRequest: ProxyV1Request;
};

export type Response = {
statusCode: number;
metadata: any;
error: string;
};

export type IterableSuccessResponse = {
status: number;
message: string;
destinationResponse: IterableBulkApiResponse;
response: Response[];
};
Loading
Loading