Skip to content
This repository was archived by the owner on Jun 13, 2023. It is now read-only.

Commit ec434b6

Browse files
author
Itay Katz
authored
feat(batch processing): traces queue + batch sending + byte size limit (#373)
* feat(batch processing): trace queue + events emiter + batch * feat(batch processing): batch config + example + fixed sendTrace * feat(batch processing): batch byte size limit * feat(batch processing): init queue * feat(batch processing): log fixes + release on process exit * feat(batch processing): queue bytes size limit + config
1 parent c961ac9 commit ec434b6

File tree

10 files changed

+550
-11
lines changed

10 files changed

+550
-11
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,7 @@ issues/
55
src/resource_utils/sql_utils.js
66
**/.DS_Store
77
.vscode
8-
.env
8+
.env
9+
.pytest_cache
10+
*.cpuprofile
11+
tenna_releases

examples/batch_example.js

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
const epsagon = require('../src/index');
2+
const http = require('http');
3+
const { Console } = require('console');
4+
5+
6+
epsagon.init({
7+
token: process.env.EPSAGON_TOKEN,
8+
appName: 'batch-test',
9+
metadataOnly: false,
10+
sendBatch: true,
11+
batchSize: 5000,
12+
maxBatchSizeBytes: 5000000,
13+
maxTraceWait: 5000 // not in use
14+
});
15+
16+
epsagon.init({
17+
token: process.env.EPSAGON_TOKEN,
18+
appName: 'batch-test',
19+
metadataOnly: false,
20+
sendBatch: true,
21+
batchSize: 5000,
22+
});
23+
24+
function doRequest(options) {
25+
return new Promise ((resolve, reject) => {
26+
let req = http.request(options);
27+
28+
req.on('response', res => {
29+
resolve(res);
30+
});
31+
32+
req.on('error', err => {
33+
resolve(err);
34+
});
35+
});
36+
}
37+
38+
39+
async function testAsyncFunction() {
40+
const options = {
41+
host: 'localhost',
42+
method: 'GET',
43+
};
44+
doRequest(options)
45+
console.log("logging something")
46+
}
47+
48+
49+
const wrappedAsyncTestFunction = epsagon.nodeWrapper(testAsyncFunction);
50+
51+
async function main (){
52+
await wrappedAsyncTestFunction()
53+
await wrappedAsyncTestFunction()
54+
55+
await wrappedAsyncTestFunction()
56+
57+
await wrappedAsyncTestFunction()
58+
59+
await wrappedAsyncTestFunction()
60+
await wrappedAsyncTestFunction()
61+
await wrappedAsyncTestFunction()
62+
await wrappedAsyncTestFunction()
63+
await wrappedAsyncTestFunction()
64+
await wrappedAsyncTestFunction()
65+
await wrappedAsyncTestFunction()
66+
await wrappedAsyncTestFunction()
67+
68+
await Promise.all([
69+
wrappedAsyncTestFunction(),
70+
wrappedAsyncTestFunction()]
71+
72+
)
73+
}
74+
75+
76+
77+
78+
main()

src/config.js

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ const config = {
4444
decodeHTTP: (process.env.EPSAGON_DECODE_HTTP || 'TRUE').toUpperCase() === 'TRUE',
4545
disableHttpResponseBodyCapture: (process.env.EPSAGON_DISABLE_HTTP_RESPONSE || '').toUpperCase() === 'TRUE',
4646
loggingTracingEnabled: (process.env.EPSAGON_LOGGING_TRACING_ENABLED || (!utils.isLambdaEnv).toString()).toUpperCase() === 'TRUE',
47+
sendBatch: (process.env.EPSAGON_SEND_BATCH || 'FALSE').toUpperCase() === 'TRUE',
48+
batchSize: (Number(process.env.EPSAGON_BATCH_SIZE) || consts.DEFAULT_BATCH_SIZE),
49+
maxTraceWait: (Number(process.env.EPSAGON_MAX_TRACE_WAIT) ||
50+
consts.MAX_TRACE_WAIT), // miliseconds
51+
maxBatchSizeBytes: consts.BATCH_SIZE_BYTES_HARD_LIMIT,
52+
maxQueueSizeBytes: consts.QUEUE_SIZE_BYTES_HARD_LIMIT,
53+
54+
4755
/**
4856
* get isEpsagonPatchDisabled
4957
* @return {boolean} True if DISABLE_EPSAGON or DISABLE_EPSAGON_PATCH are set to TRUE, false
@@ -177,6 +185,33 @@ module.exports.setConfig = function setConfig(configData) {
177185
config.sendTimeout = Number(configData.sendTimeout);
178186
}
179187

188+
if (typeof configData.sendBatch === 'boolean') {
189+
config.sendBatch = configData.sendBatch;
190+
}
191+
192+
if (Number(configData.batchSize)) {
193+
config.batchSize = Number(configData.batchSize);
194+
}
195+
if (Number(configData.maxTraceWait)) {
196+
config.maxTraceWait = Number(configData.maxTraceWait);
197+
}
198+
if (Number(configData.maxBatchSizeBytes)) {
199+
if (Number(configData.maxBatchSizeBytes) > consts.QUEUE_SIZE_BYTES_HARD_LIMIT) {
200+
utils.debugLog(`User configured maxBatchSizeBytes exceeded batch size hard limit of ${consts.BATCH_SIZE_BYTES_HARD_LIMIT} Bytes`);
201+
} else {
202+
config.maxBatchSizeBytes = Number(configData.maxBatchSizeBytes);
203+
}
204+
}
205+
206+
if (Number(configData.maxQueueSizeBytes)) {
207+
if (Number(configData.maxQueueSizeBytes) > consts.QUEUE_SIZE_BYTES_HARD_LIMIT) {
208+
utils.debugLog(`User configured maxQueueSizeBytes exceeded queue size hard limit of ${consts.QUEUE_SIZE_BYTES_HARD_LIMIT} Bytes`);
209+
} else {
210+
config.maxQueueSizeBytes = Number(configData.maxQueueSizeBytes);
211+
}
212+
}
213+
214+
180215
if (configData.labels) {
181216
config.labels = utils.flatten([...configData.labels].reduce((labels, label) => {
182217
const [key, value] = label;

src/consts.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,15 @@ module.exports.MAX_TRACE_SIZE_BYTES = 64 * 1024;
3131

3232
module.exports.DEFAULT_SAMPLE_RATE = 1;
3333

34+
module.exports.DEFAULT_BATCH_SIZE = 5;
35+
36+
module.exports.MAX_TRACE_WAIT = 5000; // miliseconds
37+
38+
module.exports.BATCH_SIZE_BYTES_HARD_LIMIT = 10 * 64 * 1024; // 650KB
39+
40+
module.exports.QUEUE_SIZE_BYTES_HARD_LIMIT = 10 * 1024 * 1024; // 10MB
41+
42+
3443
// Key name to inject epsagon correlation ID
3544
module.exports.EPSAGON_HEADER = 'epsagon-trace-id';
3645

src/index.d.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ declare module 'epsagon' {
1616
decodeHTTP?: boolean
1717
disableHttpResponseBodyCapture?: boolean
1818
loggingTracingEnabled?: boolean
19+
sendBatch?: boolean
20+
maxTraceWait?: number
21+
batchSize?: number
22+
maxBatchSizeBytes?: number
1923
}): void
2024
export function label(key: string, value: string): void
2125
export function setError(error: Error): void

src/trace_queue.js

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/**
2+
* @fileoverview The traces queue, cunsume traces and sends in batches
3+
*/
4+
const EventEmitter = require('events');
5+
const axios = require('axios');
6+
const https = require('https');
7+
const http = require('http');
8+
const utils = require('../src/utils.js');
9+
const config = require('./config.js');
10+
11+
12+
/**
13+
* Session for the post requests to the collector
14+
*/
15+
const session = axios.create({
16+
headers: { Authorization: `Bearer ${config.getConfig().token}` },
17+
timeout: config.getConfig().sendTimeout,
18+
httpAgent: new http.Agent({ keepAlive: true }),
19+
httpsAgent: new https.Agent({ keepAlive: true }),
20+
});
21+
22+
/**
23+
* Post given batch to epsagon's infrastructure.
24+
* @param {*} batchObject The batch data to send.
25+
* @returns {Promise} a promise that is resolved after the batch is posted.
26+
*/
27+
async function postBatch(batchObject) {
28+
utils.debugLog(`[QUEUE] Posting batch to ${config.getConfig().traceCollectorURL}...`);
29+
const cancelTokenSource = axios.CancelToken.source();
30+
const handle = setTimeout(() => {
31+
cancelTokenSource.cancel('Timeout sending batch!');
32+
}, config.getConfig().sendTimeout);
33+
34+
return session.post(
35+
config.getConfig().traceCollectorURL,
36+
batchObject,
37+
{
38+
cancelToken: cancelTokenSource.token,
39+
}
40+
).then((res) => {
41+
clearTimeout(handle);
42+
utils.debugLog('[QUEUE] Batch posted!');
43+
return res;
44+
}).catch((err) => {
45+
clearTimeout(handle);
46+
if (err.config && err.config.data) {
47+
utils.debugLog(`[QUEUE] Error sending trace. Batch size: ${err.config.data.length}`);
48+
} else {
49+
utils.debugLog(`[QUEUE] Error sending trace. Error: ${err}`);
50+
}
51+
utils.debugLog(`[QUEUE] ${err ? err.stack : err}`);
52+
return err;
53+
});
54+
}
55+
56+
/**
57+
* The trace queue class
58+
* @param {function} batchSender function to send batch traces
59+
*/
60+
class TraceQueue extends EventEmitter.EventEmitter {
61+
/**
62+
* EventEmitter class
63+
*/
64+
constructor() {
65+
super();
66+
this.batchSender = postBatch;
67+
this.initQueue();
68+
}
69+
70+
/**
71+
* Update the queue config
72+
*/
73+
updateConfig() {
74+
this.maxBatchSizeBytes = config.getConfig().maxBatchSizeBytes;
75+
this.batchSize = config.getConfig().batchSize;
76+
this.maxQueueSizeBytes = config.getConfig().maxQueueSizeBytes;
77+
}
78+
79+
/**
80+
* Init queue event listners
81+
*/
82+
initQueue() {
83+
this.updateConfig();
84+
this.removeAllListeners();
85+
this.flush();
86+
this.on('traceQueued', function traceQueued() {
87+
if (this.byteSizeLimitReached()) {
88+
utils.debugLog(`[QUEUE] Queue Byte size reached ${this.currentByteSize} Bytes, releasing batch...`);
89+
this.emit('releaseRequest', Math.max(this.currentSize - 1, 1));
90+
} else if (this.batchSizeReached()) {
91+
utils.debugLog(`[QUEUE] Queue size reached ${this.currentSize}, releasing batch... `);
92+
this.emit('releaseRequest');
93+
}
94+
return this;
95+
});
96+
97+
this.on('releaseRequest', function releaseRequest(count = this.batchSize) {
98+
try {
99+
const batch = this.queue.splice(0, count);
100+
utils.debugLog('[QUEUE] Releasing batch...');
101+
this.subtractFromCurrentByteSize(batch);
102+
this.emit('batchReleased', batch);
103+
} catch (err) {
104+
utils.debugLog('[QUEUE] Failed releasing batch!');
105+
utils.debugLog(`[QUEUE] ${err}`);
106+
}
107+
return this;
108+
});
109+
110+
this.on('batchReleased', async function batchReleased(batch) {
111+
utils.debugLog('[QUEUE] Sending batch...');
112+
const batchJSON = batch.map(trace => trace.traceJSON);
113+
this.batchSender(batchJSON);
114+
});
115+
process.on('exit', function releaseAndClearQueue() {
116+
this.emit('releaseRequest');
117+
this.removeAllListeners();
118+
});
119+
}
120+
121+
/**
122+
* Push trace to queue, emit event, and check if queue max queue length reached,
123+
* if it does, send batch.
124+
* @param {object} traceJson Trace JSON
125+
* @returns {TraceQueue} This trace queue
126+
*/
127+
push(traceJson) {
128+
try {
129+
if (this.currentByteSize >= this.maxQueueSizeBytes) {
130+
utils.debugLog(`[QUEUE] Discardig trace, queue size reached max size of ${this.currentByteSize} Bytes`);
131+
return this;
132+
}
133+
const timestamp = Date.now();
134+
const json = traceJson;
135+
const string = JSON.stringify(json);
136+
const byteLength = string.length;
137+
// eslint-disable-next-line object-curly-newline
138+
const trace = { json, string, byteLength, timestamp };
139+
this.queue.push(trace);
140+
this.addToCurrentByteSize([trace]);
141+
utils.debugLog(`[QUEUE] Trace size ${byteLength} Bytes pushed to queue`);
142+
utils.debugLog(`[QUEUE] Queue size: ${this.currentSize} traces, total size of ${this.currentByteSize} Bytes`);
143+
this.emit('traceQueued', trace);
144+
} catch (err) {
145+
utils.debugLog(`[QUEUE] Failed pushing trace to queue: ${err}`);
146+
}
147+
return this;
148+
}
149+
150+
/**
151+
* add given trace byte size to total byte size
152+
* @param {Array} traces Trace object array
153+
*/
154+
addToCurrentByteSize(traces) {
155+
traces.forEach((trace) => {
156+
this.currentByteSize += trace.byteLength;
157+
});
158+
}
159+
160+
/**
161+
* subtract given trace byte size to total byte size
162+
* @param {Array} traces Trace object array
163+
*/
164+
subtractFromCurrentByteSize(traces) {
165+
traces.forEach((trace) => {
166+
this.currentByteSize -= trace.byteLength;
167+
this.currentByteSize = Math.max(this.currentByteSize, 0);
168+
});
169+
}
170+
171+
/**
172+
* Queue size getter
173+
* @returns {Number} Queue length
174+
*/
175+
get currentSize() {
176+
return this.queue.length;
177+
}
178+
179+
180+
/**
181+
* Checks if queue size reached batch size
182+
* @returns {Boolean} Indicator for if current queue size is larger than batch size definition
183+
*/
184+
batchSizeReached() {
185+
return this.currentSize >= this.batchSize;
186+
}
187+
188+
/**
189+
* Checks if queue byte size reached its limit
190+
* @returns {Boolean} Indicator for if current queue byte size is larger than byte size definition
191+
*/
192+
byteSizeLimitReached() {
193+
return this.currentByteSize >= this.maxBatchSizeBytes;
194+
}
195+
196+
/**
197+
* Flush queue
198+
*/
199+
flush() {
200+
this.queue = [];
201+
this.currentByteSize = 0;
202+
}
203+
}
204+
205+
const traceQueue = new TraceQueue();
206+
207+
module.exports.getInstance = () => traceQueue;

0 commit comments

Comments
 (0)