From f77d2ab4125a1a44bba95bebbee25bb4fac032da Mon Sep 17 00:00:00 2001 From: Anant Jain <62471433+anantjain45823@users.noreply.github.com> Date: Tue, 10 Sep 2024 11:27:20 +0530 Subject: [PATCH] feat: onboard X(Twiiter) Audience (#3696) * feat: onboard X(Twiiter) Audience * chore: add test cases * chore: add router test cases * chore: small fixes * fix: common mock value for twitter ads and audience * chore: config fallback * fix: root level batching stringification --- src/features.json | 1 + src/v0/destinations/x_audience/config.js | 5 + src/v0/destinations/x_audience/transform.js | 76 ++++++ src/v0/destinations/x_audience/utils.js | 241 ++++++++++++++++++ .../destinations/x_audience/common.ts | 27 ++ .../destinations/x_audience/processor/data.ts | 236 +++++++++++++++++ .../destinations/x_audience/router/data.ts | 235 +++++++++++++++++ 7 files changed, 821 insertions(+) create mode 100644 src/v0/destinations/x_audience/config.js create mode 100644 src/v0/destinations/x_audience/transform.js create mode 100644 src/v0/destinations/x_audience/utils.js create mode 100644 test/integrations/destinations/x_audience/common.ts create mode 100644 test/integrations/destinations/x_audience/processor/data.ts create mode 100644 test/integrations/destinations/x_audience/router/data.ts diff --git a/src/features.json b/src/features.json index 77ab5cb2437..503df3eccfc 100644 --- a/src/features.json +++ b/src/features.json @@ -77,6 +77,7 @@ "CLICKSEND": true, "ZOHO": true, "CORDIAL": true, + "X_AUDIENCE": true, "BLOOMREACH_CATALOG": true, "SMARTLY": true, "WEBHOOK_V2": true diff --git a/src/v0/destinations/x_audience/config.js b/src/v0/destinations/x_audience/config.js new file mode 100644 index 00000000000..37b35c00b70 --- /dev/null +++ b/src/v0/destinations/x_audience/config.js @@ -0,0 +1,5 @@ +const BASE_URL = + 'https://ads-api.twitter.com/12/accounts/:account_id/custom_audiences/:audience_id/users'; +const MAX_PAYLOAD_SIZE_IN_BYTES = 4000000; +const MAX_OPERATIONS = 2500; +module.exports = { BASE_URL, MAX_PAYLOAD_SIZE_IN_BYTES, MAX_OPERATIONS }; diff --git a/src/v0/destinations/x_audience/transform.js b/src/v0/destinations/x_audience/transform.js new file mode 100644 index 00000000000..69f76507e56 --- /dev/null +++ b/src/v0/destinations/x_audience/transform.js @@ -0,0 +1,76 @@ +/* eslint-disable @typescript-eslint/naming-convention */ +const { + removeUndefinedAndNullAndEmptyValues, + InstrumentationError, + ConfigurationError, +} = require('@rudderstack/integrations-lib'); +const { handleRtTfSingleEventError } = require('../../util'); +const { batchEvents, buildResponseWithJSON, getUserDetails } = require('./utils'); +/** + * This function returns audience object in the form of destination API + * @param {*} message + * @param {*} destination + * @param {*} metadata + */ +const processRecordEvent = (message, config) => { + const { fields, action, type } = message; + if (type !== 'record') { + throw new InstrumentationError(`[X AUDIENCE]: ${type} is not supported`); + } + const { accountId, audienceId } = { config }; + if (accountId) { + throw new ConfigurationError('[X AUDIENCE]: Account Id not found'); + } + if (audienceId) { + throw new ConfigurationError('[X AUDIENCE]: Audience Id not found'); + } + const { effective_at, expires_at } = fields; + const users = [getUserDetails(fields, config)]; + + return { + operation_type: action !== 'delete' ? 'Update' : 'Delete', + params: removeUndefinedAndNullAndEmptyValues({ + effective_at, + expires_at, + users, + }), + }; +}; +const process = (event) => { + const { message, destination, metadata } = event; + const Config = destination.Config || destination.config; + + const payload = [processRecordEvent(message, Config)]; + return buildResponseWithJSON(payload, Config, metadata); +}; +const processRouterDest = async (inputs, reqMetadata) => { + const responseList = []; // list containing single track event payload + const errorRespList = []; // list of error + const { destination } = inputs[0]; + const Config = destination.Config || destination.config; + inputs.map(async (event) => { + try { + if (event.message.statusCode) { + // already transformed event + responseList.push(event); + } else { + // if not transformed + responseList.push({ + message: processRecordEvent(event.message, Config), + metadata: event.metadata, + destination, + }); + } + } catch (error) { + const errRespEvent = handleRtTfSingleEventError(event, error, reqMetadata); + errorRespList.push(errRespEvent); + } + }); + let batchedResponseList = []; + if (responseList.length > 0) { + batchedResponseList = batchEvents(responseList, destination); + } + return [...batchedResponseList, ...errorRespList]; +}; + +module.exports = { process, processRouterDest, buildResponseWithJSON }; diff --git a/src/v0/destinations/x_audience/utils.js b/src/v0/destinations/x_audience/utils.js new file mode 100644 index 00000000000..62c825efac8 --- /dev/null +++ b/src/v0/destinations/x_audience/utils.js @@ -0,0 +1,241 @@ +/* eslint-disable @typescript-eslint/naming-convention */ +const sha256 = require('sha256'); +const lodash = require('lodash'); +const jsonSize = require('json-size'); +const { OAuthSecretError } = require('@rudderstack/integrations-lib'); +const { + defaultRequestConfig, + getSuccessRespEvents, + removeUndefinedAndNullAndEmptyValues, +} = require('../../util'); +const { MAX_PAYLOAD_SIZE_IN_BYTES, BASE_URL, MAX_OPERATIONS } = require('./config'); +const { getAuthHeaderForRequest } = require('../twitter_ads/util'); +const { JSON_MIME_TYPE } = require('../../util/constant'); + +const getOAuthFields = ({ secret }) => { + if (!secret) { + throw new OAuthSecretError('[X Audience]:: OAuth - access keys not found'); + } + const oAuthObject = { + consumerKey: secret.consumerKey, + consumerSecret: secret.consumerSecret, + accessToken: secret.accessToken, + accessTokenSecret: secret.accessTokenSecret, + }; + return oAuthObject; +}; + +// Docs: https://developer.x.com/en/docs/x-ads-api/audiences/api-reference/custom-audience-user +const buildResponseWithJSON = (payload, config, metadata) => { + const response = defaultRequestConfig(); + const accountId = Object.values(JSON.parse(config.accountId))[0]; + response.endpoint = BASE_URL.replace(':account_id', accountId).replace( + ':audience_id', + config.audienceId, + ); + response.body.JSON_ARRAY = { batch: JSON.stringify(payload) }; + // required to be in accordance with oauth package + const request = { + url: response.endpoint, + method: response.method, + body: response.body.JSON, + }; + + const oAuthObject = getOAuthFields(metadata); + const authHeader = getAuthHeaderForRequest(request, oAuthObject).Authorization; + response.headers = { + Authorization: authHeader, + 'Content-Type': JSON_MIME_TYPE, + }; + return response; +}; + +/** + * This fucntion groups the response list based upoin 3 fields that are + * 1. operation_type + * 2. effective_at + * 3. expires_at + * @param {*} respList + * @returns object + */ +const groupResponsesUsingOperationAndTime = (respList) => { + const eventGroups = lodash.groupBy(respList, (item) => [ + item.message.operation_type, + item.message.params.effective_at, + item.message.params.expires_at, + ]); + return eventGroups; +}; +/** + * This function groups the operation object list based upon max sized or batch size allowed + * and returns the final batched request + * @param {*} operationObjectList + */ +const getFinalResponseList = (operationObjectList, destination) => { + const respList = []; + let currentMetadataList = []; + let currentBatchedRequest = []; + let metadataWithSecret; // used for authentication purposes + operationObjectList.forEach((operationObject) => { + const { payload, metadataList } = operationObject; + metadataWithSecret = { secret: metadataList[0].secret }; + if ( + currentBatchedRequest.length >= MAX_OPERATIONS || + jsonSize([...currentBatchedRequest, payload]) > MAX_PAYLOAD_SIZE_IN_BYTES + ) { + respList.push( + getSuccessRespEvents( + buildResponseWithJSON( + currentBatchedRequest, + destination.Config || destination.config, + metadataWithSecret, + ), + currentMetadataList, + destination, + true, + ), + ); + currentBatchedRequest = [payload]; + currentMetadataList = metadataList; + } else { + currentBatchedRequest.push(payload); + currentMetadataList.push(...metadataList); + } + }); + // pushing the remainder operation payloads as well + respList.push( + getSuccessRespEvents( + buildResponseWithJSON( + currentBatchedRequest, + destination.Config || destination.config, + metadataWithSecret, + ), + currentMetadataList, + destination, + true, + ), + ); + return respList; +}; + +/** + * This function takes in object containing key as the grouped parameter + * and values as list of all concerned payloads ( having the same key ). + * Then it makes the list of operationObject based upon + * operation and effective and expires time and json size of payload of one object + * @param {*} eventGroups + * @returns + */ +const getOperationObjectList = (eventGroups) => { + const operationList = []; + Object.keys(eventGroups).forEach((group) => { + const { operation_type, params } = eventGroups[group][0].message; + const { effective_at, expires_at } = params; + let currentUserList = []; + let currentMetadata = []; + eventGroups[group].forEach((event) => { + const newUsers = event.message.params.users; + // calculating size before appending the user and metadata list + if (jsonSize([...currentUserList, ...newUsers]) < MAX_PAYLOAD_SIZE_IN_BYTES) { + currentUserList.push(...event.message.params.users); + currentMetadata.push(event.metadata); + } else { + operationList.push({ + payload: { + operation_type, + params: removeUndefinedAndNullAndEmptyValues({ + effective_at, + expires_at, + users: currentUserList, + }), + }, + metadataList: currentMetadata, + }); + currentUserList = event.message.params.users; + currentMetadata = event.metadata; + } + }); + // all the remaining user and metadata list used in one list + operationList.push({ + payload: { + operation_type, + params: removeUndefinedAndNullAndEmptyValues({ + effective_at, + expires_at, + users: currentUserList, + }), + }, + metadataList: currentMetadata, + }); + }); + return operationList; +}; + +/** + * Input: [{ + message: { + operation_type: 'Delete', + params: { + effective_at, + expires_at, + users, + }, + }, + metadata, + destination, + }] + * @param {*} responseList + */ +const batchEvents = (responseList, destination) => { + const eventGroups = groupResponsesUsingOperationAndTime(responseList); + const operationObjectList = getOperationObjectList(eventGroups); + /* at this point we will a list of json payloads in the following format + operationObjectList = [ + { + payload:{ + operation_type: 'Delete', + params: { + effective_at, + expires_at, + users, + }, + metadata: + [ + {jobId:1}, {jobId:2} + ] + } + ] + */ + return getFinalResponseList(operationObjectList, destination); +}; + +const getUserDetails = (fields, config) => { + const { enableHash } = config; + const { email, phone_number, handle, device_id, twitter_id, partner_user_id } = fields; + const user = {}; + if (email) { + const emailList = email.split(','); + user.email = enableHash ? emailList.map(sha256) : emailList; + } + if (phone_number) { + const phone_numberList = phone_number.split(','); + user.phone_number = enableHash ? phone_numberList.map(sha256) : phone_numberList; + } + if (handle) { + const handleList = handle.split(','); + user.handle = enableHash ? handleList.map(sha256) : handleList; + } + if (device_id) { + const device_idList = device_id.split(','); + user.device_id = enableHash ? device_idList.map(sha256) : device_idList; + } + if (twitter_id) { + const twitter_idList = twitter_id.split(','); + user.twitter_id = enableHash ? twitter_idList.map(sha256) : twitter_idList; + } + if (partner_user_id) { + user.partner_user_id = partner_user_id.split(','); + } + return removeUndefinedAndNullAndEmptyValues(user); +}; +module.exports = { getOAuthFields, batchEvents, getUserDetails, buildResponseWithJSON }; diff --git a/test/integrations/destinations/x_audience/common.ts b/test/integrations/destinations/x_audience/common.ts new file mode 100644 index 00000000000..547e04ba848 --- /dev/null +++ b/test/integrations/destinations/x_audience/common.ts @@ -0,0 +1,27 @@ +export const authHeaderConstant = + 'OAuth oauth_consumer_key="qwe", oauth_nonce="V1kMh028kZLLhfeYozuL0B45Pcx6LvuW", oauth_signature="Di4cuoGv4PnCMMEeqfWTcqhvdwc%3D", oauth_signature_method="HMAC-SHA1", oauth_timestamp="1685603652", oauth_token="dummyAccessToken", oauth_version="1.0"'; +export const destination = { + Config: { + accountId: '{"Dummy Name":"1234"}', + audienceId: 'dummyId', + }, + ID: 'xpixel-1234', +}; + +export const generateMetadata = (jobId: number, userId?: string): any => { + return { + jobId, + attemptNum: 1, + userId: userId || 'default-userId', + sourceId: 'default-sourceId', + destinationId: 'default-destinationId', + workspaceId: 'default-workspaceId', + secret: { + consumerKey: 'validConsumerKey', + consumerSecret: 'validConsumerSecret', + accessToken: 'validAccessToken', + accessTokenSecret: 'validAccessTokenSecret', + }, + dontBatch: false, + }; +}; diff --git a/test/integrations/destinations/x_audience/processor/data.ts b/test/integrations/destinations/x_audience/processor/data.ts new file mode 100644 index 00000000000..deada7ab58c --- /dev/null +++ b/test/integrations/destinations/x_audience/processor/data.ts @@ -0,0 +1,236 @@ +import { destination, authHeaderConstant, generateMetadata } from '../common'; + +const fields = { + email: 'abc@xyz.com,a+1@xyz.com', + phone_number: '98765433232,21323', + handle: '@abc,@xyz', + twitter_id: 'tid1,tid2', + partner_user_id: 'puid1,puid2', +}; + +export const data = [ + { + name: 'x_audience', + description: 'All traits are present with hash enbaled for the audience with insert operation', + successCriteria: 'It should be passed with 200 Ok with all traits mapped after hashing', + feature: 'processor', + module: 'destination', + version: 'v0', + input: { + request: { + body: [ + { + destination: { ...destination, Config: { ...destination.Config, enableHash: true } }, + message: { + type: 'record', + action: 'insert', + fields: { + ...fields, + device_id: 'did123,did456', + effective_at: '2024-05-15T00:00:00Z', + expires_at: '2025-05-15T00:00:00Z', + }, + context: {}, + recordId: '1', + }, + metadata: generateMetadata(1), + }, + ], + }, + }, + output: { + response: { + status: 200, + body: [ + { + output: { + version: '1', + type: 'REST', + method: 'POST', + endpoint: + 'https://ads-api.twitter.com/12/accounts/1234/custom_audiences/dummyId/users', + headers: { + Authorization: authHeaderConstant, + 'Content-Type': 'application/json', + }, + params: {}, + body: { + JSON: {}, + JSON_ARRAY: { + batch: JSON.stringify([ + { + operation_type: 'Update', + params: { + effective_at: '2024-05-15T00:00:00Z', + expires_at: '2025-05-15T00:00:00Z', + users: [ + { + email: [ + 'ee278943de84e5d6243578ee1a1057bcce0e50daad9755f45dfa64b60b13bc5d', + '27a1b87036e9b0f43235026e7cb1493f1838b6fe41965ea04486d82e499f8401', + ], + phone_number: [ + '76742d946d9f6d0c844da5648e461896227782ccf1cd0db64573f39dbd92e05f', + '69c3bf36e0476c08a883fd6a995f67fc6d362c865549312fb5170737945fd073', + ], + handle: [ + '771c7b0ff2eff313009a81739307c3f7cde375acd7902b11061266a899a375f6', + '7bde3c2d41eab9043df37c9adf4f5f7591c632340d1cabc894e438e881fdd5f6', + ], + device_id: [ + '85a598fd6c8834f2d4da3d6886bb53d0032021e137307ec91d3f0da78e9bfa5b', + '936444046bea8b5d9de6bcae59b6f196ea4bb59945bc93e84bc9533dbf3e01c0', + ], + twitter_id: [ + 'a70d41727df61f21ce0ec81cca51c58f516b6151275d9293d7437bf15fa22e0d', + 'e39994d056999d79ff5a35b02cf2af946fc14bd7bd1b799b58619796584af02f', + ], + partner_user_id: ['puid1', 'puid2'], + }, + ], + }, + }, + ]), + }, + XML: {}, + FORM: {}, + }, + files: {}, + userId: '', + }, + metadata: generateMetadata(1), + statusCode: 200, + }, + ], + }, + }, + }, + { + name: 'x_audience', + description: 'All traits are present with hash disabled for the audience with delete operation', + successCriteria: 'It should be passed with 200 Ok with all traits mapped without hashing', + feature: 'processor', + module: 'destination', + version: 'v0', + input: { + request: { + body: [ + { + destination, + message: { + type: 'record', + action: 'delete', + fields, + channel: 'sources', + context: {}, + recordId: '1', + }, + metadata: generateMetadata(1), + }, + ], + }, + }, + output: { + response: { + status: 200, + body: [ + { + output: { + version: '1', + type: 'REST', + method: 'POST', + endpoint: + 'https://ads-api.twitter.com/12/accounts/1234/custom_audiences/dummyId/users', + headers: { + Authorization: authHeaderConstant, + 'Content-Type': 'application/json', + }, + params: {}, + body: { + JSON: {}, + JSON_ARRAY: { + batch: JSON.stringify([ + { + operation_type: 'Delete', + params: { + users: [ + { + email: ['abc@xyz.com', 'a+1@xyz.com'], + phone_number: ['98765433232', '21323'], + handle: ['@abc', '@xyz'], + twitter_id: ['tid1', 'tid2'], + partner_user_id: ['puid1', 'puid2'], + }, + ], + }, + }, + ]), + }, + XML: {}, + FORM: {}, + }, + files: {}, + userId: '', + }, + metadata: generateMetadata(1), + statusCode: 200, + }, + ], + }, + }, + }, + { + name: 'x_audience', + description: 'Type Validation case', + successCriteria: 'It should be passed with 200 Ok giving validation error', + feature: 'processor', + module: 'destination', + version: 'v0', + input: { + request: { + body: [ + { + destination, + message: { + type: 'identify', + context: {}, + recordId: '1', + }, + metadata: generateMetadata(1), + }, + ], + }, + }, + output: { + response: { + status: 200, + body: [ + { + metadata: generateMetadata(1), + statusCode: 400, + error: '[X AUDIENCE]: identify is not supported', + statTags: { + errorCategory: 'dataValidation', + destinationId: 'default-destinationId', + errorType: 'instrumentation', + destType: 'X_AUDIENCE', + module: 'destination', + implementation: 'native', + workspaceId: 'default-workspaceId', + feature: 'processor', + }, + }, + ], + }, + }, + }, +].map((tc) => ({ + ...tc, + mockFns: (_) => { + jest.mock('../../../../../src/v0/destinations/twitter_ads/util', () => ({ + getAuthHeaderForRequest: (_a, _b) => { + return { Authorization: authHeaderConstant }; + }, + })); + }, +})); diff --git a/test/integrations/destinations/x_audience/router/data.ts b/test/integrations/destinations/x_audience/router/data.ts new file mode 100644 index 00000000000..228a5ecd894 --- /dev/null +++ b/test/integrations/destinations/x_audience/router/data.ts @@ -0,0 +1,235 @@ +import { destination, authHeaderConstant, generateMetadata } from '../common'; + +export const data = [ + { + name: 'x_audience', + id: 'router-test-1', + description: + 'case with 2 record with no effective and expire at date with insert operations, 4 insert with 2 each having same effective and expire at and one delete and one failure event', + feature: 'router', + module: 'destination', + version: 'v0', + input: { + request: { + body: { + input: [ + { + destination, + message: { + type: 'record', + action: 'delete', + fields: { email: 'email1@abc.com' }, + channel: 'sources', + context: {}, + recordId: '1', + }, + metadata: generateMetadata(1), + }, + { + destination, + message: { + type: 'record', + action: 'insert', + fields: { email: 'email2@abc.com' }, + channel: 'sources', + context: {}, + recordId: '2', + }, + metadata: generateMetadata(2), + }, + { + destination, + message: { + type: 'record', + action: 'insert', + fields: { email: 'email3@abc.com' }, + channel: 'sources', + context: {}, + recordId: '3', + }, + metadata: generateMetadata(3), + }, + { + destination, + message: { + type: 'record', + action: 'update', + fields: { + email: 'email4@abc.com', + expires_at: 'some date', + effective_at: 'some effective date', + }, + channel: 'sources', + context: {}, + recordId: '4', + }, + metadata: generateMetadata(4), + }, + { + destination, + message: { + type: 'record', + action: 'update', + fields: { + email: 'email5@abc.com', + expires_at: 'some date', + effective_at: 'some effective date', + }, + channel: 'sources', + context: {}, + recordId: '5', + }, + metadata: generateMetadata(5), + }, + { + destination, + message: { + type: 'identify', + context: {}, + recordId: '1', + }, + metadata: generateMetadata(6), + }, + ], + destType: 'x_audience', + }, + method: 'POST', + }, + }, + output: { + response: { + status: 200, + body: { + output: [ + { + batched: true, + batchedRequest: { + version: '1', + type: 'REST', + method: 'POST', + endpoint: + 'https://ads-api.twitter.com/12/accounts/1234/custom_audiences/dummyId/users', + headers: { + Authorization: authHeaderConstant, + 'Content-Type': 'application/json', + }, + params: {}, + body: { + JSON: {}, + JSON_ARRAY: { + batch: JSON.stringify([ + { + operation_type: 'Delete', + params: { + users: [ + { + email: ['email1@abc.com'], + }, + ], + }, + }, + { + operation_type: 'Update', + params: { + users: [ + { + email: ['email2@abc.com'], + }, + { + email: ['email3@abc.com'], + }, + ], + }, + }, + ]), + }, + XML: {}, + FORM: {}, + }, + files: {}, + }, + destination, + metadata: [generateMetadata(1), generateMetadata(2), generateMetadata(3)], + statusCode: 200, + }, + { + batched: true, + batchedRequest: { + version: '1', + type: 'REST', + method: 'POST', + endpoint: + 'https://ads-api.twitter.com/12/accounts/1234/custom_audiences/dummyId/users', + headers: { + Authorization: authHeaderConstant, + 'Content-Type': 'application/json', + }, + params: {}, + body: { + JSON: {}, + JSON_ARRAY: { + batch: JSON.stringify([ + { + operation_type: 'Update', + params: { + effective_at: 'some effective date', + expires_at: 'some date', + users: [ + { + email: ['email4@abc.com'], + }, + { + email: ['email5@abc.com'], + }, + ], + }, + }, + ]), + }, + XML: {}, + FORM: {}, + }, + files: {}, + }, + destination, + metadata: [generateMetadata(4), generateMetadata(5)], + statusCode: 200, + }, + { + metadata: [generateMetadata(6)], + destination, + batched: false, + statusCode: 400, + error: '[X AUDIENCE]: identify is not supported', + statTags: { + errorCategory: 'dataValidation', + destinationId: 'default-destinationId', + errorType: 'instrumentation', + destType: 'X_AUDIENCE', + module: 'destination', + implementation: 'native', + workspaceId: 'default-workspaceId', + feature: 'router', + }, + }, + ], + }, + }, + }, + }, +].map((tc) => ({ + ...tc, + mockFns: (_) => { + jest.mock('../../../../../src/v0/destinations/twitter_ads/util', () => ({ + getAuthHeaderForRequest: (_a, _b) => { + return { Authorization: authHeaderConstant }; + }, + })); + jest.mock('../../../../../src/v0/destinations/x_audience/config', () => ({ + BASE_URL: + 'https://ads-api.twitter.com/12/accounts/:account_id/custom_audiences/:audience_id/users', + MAX_PAYLOAD_SIZE_IN_BYTES: 40000, + MAX_OPERATIONS: 2, + })); + }, +}));