Skip to content

Commit 66afe74

Browse files
author
Max Gruenfelder
committed
tx handling and runner
1 parent 9aecf6d commit 66afe74

13 files changed

+11482
-61
lines changed

db/EventQueue.cds

+1
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,5 @@ entity EventQueue: cuid {
2121
payload: LargeString;
2222
attempts: Integer default 0 not null;
2323
lastAttemptTimestamp: Timestamp;
24+
createdAt: Timestamp @cds.on.insert : $now;
2425
}

jest.config.js

+1
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@
33
module.exports = {
44
// A list of paths to modules that run some code to configure or set up the testing framework before each test
55
setupFilesAfterEnv: ["./jest.setupAfterEnv.js"],
6+
testTimeout: 600000,
67
};

package-lock.json

+11,321
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
"description": "event queue for cds",
55
"main": "src/index.js",
66
"files": [
7-
"src"
7+
"src",
8+
"db"
89
],
910
"scripts": {
1011
"test": "jest",

src/EventQueueProcessorBase.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,7 @@ class EventQueueProcessorBase {
596596
).toISOString(),
597597
") )"
598598
)
599-
.orderBy("ID")
599+
.orderBy("createdAt", "ID")
600600
);
601601
this.logger.debug("no entries available for processing", {
602602
additionalMessageProperties: {

src/index.js

+5-4
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22

33
// TODO: how to deal with tx chaining - maybe inherit _ from context
44
// TODO: how to deal with fatal logs
5-
// TODO: think about switching to cds.env from own config class
6-
// TODO: add createdAt to persistence to proper order/sort event queue entries
7-
// FIXME: Performance measurement executed\n{ name: undefined, milliseconds: 2776 }
8-
// TODO: control concurrency for runner files
95
// TODO: modifyQueueEntry|checkEventAndGeneratePayload should not produce an unexpected error
6+
7+
// FEATURES
8+
// TODO: think about switching to cds.env from own config class
109
// TODO: multiInstance without redis??
1110
// TODO: runAutomatically: true flag in config
11+
// TODO: control concurrency for runner files
12+
// TODO: add createdAt to persistence to proper order/sort event queue entries
1213

1314
// TODO: for test
1415
// --> deeper look into the functions e.g. getQueueEntriesAndSetToInProgress

src/runner.js

+30-21
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,22 @@
11
"use strict";
22

3+
const uuid = require("uuid");
4+
35
const eventQueueConfig = require("./config");
46
const cdsHelper = require("./shared/cdsHelper");
57
const { eventQueueRunner } = require("./processEventQueue");
68
const { Logger } = require("./shared/logger");
7-
const uuid = require("uuid");
89
const distributedLock = require("./shared/distributedLock");
910

1011
const COMPONENT_NAME = "eventQueue/runner";
1112
const EVENT_QUEUE_RUN_ID = "EVENT_QUEUE_RUN_ID";
1213
const OFFSET_FIRST_RUN = 10 * 1000;
1314

14-
const singleInstanceAndTenant = () =>
15-
_scheduleFunction(_singleInstanceAndTenant);
16-
17-
const _singleInstanceAndTenant = async () => {
18-
await _executeRunForTenant();
19-
};
15+
const singleInstanceAndTenant = () => _scheduleFunction(_executeRunForTenant);
2016

2117
const singleInstanceAndMultiTenancy = () =>
2218
_scheduleFunction(_singleInstanceAndMultiTenancy);
2319

24-
const _singleInstanceAndMultiTenancy = async () => {
25-
try {
26-
const tenantIds = await cdsHelper.getAllTenantIds();
27-
for (const tenantId of tenantIds) {
28-
await _executeRunForTenant(tenantId);
29-
}
30-
} catch (err) {
31-
Logger(cds.context, COMPONENT_NAME).error(
32-
"Couldn't fetch tenant ids for event queue processing! Next try after defined interval.",
33-
{ error: err }
34-
);
35-
}
36-
};
37-
3820
const multiInstanceAndTenancy = () =>
3921
_scheduleFunction(_multiInstanceAndTenancy);
4022

@@ -77,6 +59,32 @@ const _multiInstanceAndTenancy = async () => {
7759
}
7860
};
7961

62+
const _singleInstanceAndMultiTenancy = async () => {
63+
try {
64+
const configInstance = eventQueueConfig.getConfigInstance();
65+
const tenantIds = await cdsHelper.getAllTenantIds();
66+
for (const tenantId of tenantIds) {
67+
const tenantContext = new cds.EventContext({ tenant: tenantId });
68+
const couldAcquireLock = await distributedLock.acquireLock(
69+
tenantContext,
70+
EVENT_QUEUE_RUN_ID,
71+
{
72+
expiryTime: configInstance.betweenRuns * 0.9,
73+
}
74+
);
75+
if (!couldAcquireLock) {
76+
continue;
77+
}
78+
await _executeRunForTenant(tenantId);
79+
}
80+
} catch (err) {
81+
Logger(cds.context, COMPONENT_NAME).error(
82+
"Couldn't fetch tenant ids for event queue processing! Next try after defined interval.",
83+
{ error: err }
84+
);
85+
}
86+
};
87+
8088
const _multiInstanceAndSingleTenancy = async () => {
8189
await _executeRunForTenant();
8290
};
@@ -139,5 +147,6 @@ module.exports = {
139147
multiInstanceAndSingleTenancy,
140148
_: {
141149
_multiInstanceAndTenancy,
150+
_singleInstanceAndMultiTenancy,
142151
},
143152
};

src/shared/PerformanceTracer.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class PerformanceTracer {
6565

6666
this.__logger.info("Performance measurement executed", {
6767
additionalMessageProperties: {
68-
name: this._fnName,
68+
name: this.__name,
6969
milliseconds: executionTime,
7070
},
7171
customFields,

src/shared/cdsHelper.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ async function executeInNewTransaction(
4141
http: context.http,
4242
},
4343
async (tx) => {
44-
tx.context._ = tx.context._ ?? {};
44+
tx.context._ = context._ ?? {};
4545
return fn(tx, ...parameters);
4646
}
4747
);

src/shared/distributedLock.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
const redisWrapper = require("@sap/btp-feature-toggles/src/redisWrapper");
44

55
const config = require("../config");
6-
const { executeInNewTransaction } = require("./cdsHelper");
6+
const cdsHelper = require("./cdsHelper");
77

88
const acquireLock = async (
99
context,
@@ -81,7 +81,7 @@ const _checkLockExistsRedis = async (context, fullKey) => {
8181

8282
const _checkLockExistsDb = async (context, fullKey) => {
8383
let result;
84-
await executeInNewTransaction(
84+
await cdsHelper.executeInNewTransaction(
8585
context,
8686
"distributedLock-checkExists",
8787
async (tx) => {
@@ -99,7 +99,7 @@ const _releaseLockRedis = async (context, fullKey) => {
9999
};
100100

101101
const _releaseLockDb = async (context, fullKey) => {
102-
await executeInNewTransaction(
102+
await cdsHelper.executeInNewTransaction(
103103
context,
104104
"distributedLock-release",
105105
async (tx) => {
@@ -110,7 +110,7 @@ const _releaseLockDb = async (context, fullKey) => {
110110

111111
const _acquireLockDB = async (context, fullKey, expiryTime, value = true) => {
112112
let result;
113-
await executeInNewTransaction(
113+
await cdsHelper.executeInNewTransaction(
114114
context,
115115
"distributedLock-acquire",
116116
async (tx) => {

test/__snapshots__/redisRunner.test.js.snap

-27
This file was deleted.
+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Jest Snapshot v1, https://goo.gl/fbAQLP
2+
3+
exports[`redisRunner db 1`] = `
4+
{
5+
"9f3ed8f0-8aaf-439e-a96a-04cd5b680c59": {
6+
"numberOfChecks": 2,
7+
"values": [
8+
true,
9+
false,
10+
],
11+
},
12+
"cd805323-879c-4bf7-b19c-8ffbbee22e1f": {
13+
"numberOfChecks": 2,
14+
"values": [
15+
true,
16+
false,
17+
],
18+
},
19+
"e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792": {
20+
"numberOfChecks": 2,
21+
"values": [
22+
true,
23+
false,
24+
],
25+
},
26+
}
27+
`;
28+
29+
exports[`redisRunner redis 1`] = `
30+
{
31+
"9f3ed8f0-8aaf-439e-a96a-04cd5b680c59": {
32+
"numberOfChecks": 2,
33+
"values": [
34+
true,
35+
false,
36+
],
37+
},
38+
"cd805323-879c-4bf7-b19c-8ffbbee22e1f": {
39+
"numberOfChecks": 2,
40+
"values": [
41+
true,
42+
false,
43+
],
44+
},
45+
"e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792": {
46+
"numberOfChecks": 2,
47+
"values": [
48+
true,
49+
false,
50+
],
51+
},
52+
}
53+
`;

test/redisRunner.test.js test/runner.test.js

+62-1
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,10 @@ describe("redisRunner", () => {
5555

5656
afterEach(async () => {
5757
await tx.rollback();
58+
jest.clearAllMocks();
5859
});
5960

60-
it("redis test", async () => {
61+
it("redis", async () => {
6162
const setValueWithExpireSpy = jest.spyOn(
6263
distributedLock,
6364
"setValueWithExpire"
@@ -68,6 +69,7 @@ describe("redisRunner", () => {
6869
"checkLockExistsAndReturnValue"
6970
);
7071
getAllTenantIdsSpy
72+
.mockResolvedValueOnce(tenantIds)
7173
.mockResolvedValueOnce(tenantIds)
7274
.mockResolvedValueOnce(tenantIds);
7375
const p1 = runner._._multiInstanceAndTenancy();
@@ -95,5 +97,64 @@ describe("redisRunner", () => {
9597
tenantChecks[tenantId].values.push(result);
9698
}
9799
expect(tenantChecks).toMatchSnapshot();
100+
101+
// another run within 5 minutes should do nothing
102+
await runner._._multiInstanceAndTenancy();
103+
expect(acquireLockSpy).toHaveBeenCalledTimes(9);
104+
expect(eventQueueRunnerSpy).toHaveBeenCalledTimes(3);
105+
});
106+
107+
it("db", async () => {
108+
jest
109+
.spyOn(cdsHelper, "executeInNewTransaction")
110+
.mockImplementation(async (context = {}, transactionTag, fn) => {
111+
await fn(tx);
112+
});
113+
configInstance.isOnCF = false;
114+
const acquireLockSpy = jest.spyOn(distributedLock, "acquireLock");
115+
getAllTenantIdsSpy
116+
.mockResolvedValueOnce(tenantIds)
117+
.mockResolvedValueOnce(tenantIds)
118+
.mockResolvedValueOnce(tenantIds)
119+
.mockResolvedValueOnce(tenantIds);
120+
const p1 = runner._._singleInstanceAndMultiTenancy();
121+
const p2 = runner._._singleInstanceAndMultiTenancy();
122+
123+
await Promise.allSettled([p1, p2]);
124+
125+
expect(acquireLockSpy).toHaveBeenCalledTimes(6);
126+
expect(eventQueueRunnerSpy).toHaveBeenCalledTimes(3);
127+
128+
const acquireLockMock = acquireLockSpy.mock;
129+
const runId = acquireLockMock.calls[0][1];
130+
131+
const tenantChecks = tenantIds.reduce((result, tenantId) => {
132+
result[tenantId] = { numberOfChecks: 0, values: [] };
133+
return result;
134+
}, {});
135+
for (let i = 0; i < 6; i++) {
136+
const tenantId = acquireLockMock.calls[i][0].tenant;
137+
expect(runId).toEqual(acquireLockMock.calls[i][1]);
138+
const result = await acquireLockMock.results[i].value;
139+
tenantChecks[tenantId].numberOfChecks++;
140+
tenantChecks[tenantId].values.push(result);
141+
}
142+
expect(tenantChecks).toMatchSnapshot();
143+
144+
// another run within 5 minutes should do nothing
145+
await runner._._singleInstanceAndMultiTenancy();
146+
expect(eventQueueRunnerSpy).toHaveBeenCalledTimes(3);
147+
expect(acquireLockSpy).toHaveBeenCalledTimes(9);
148+
149+
// 5 mins later the tenants should be processed again
150+
await tx.run(
151+
UPDATE.entity("sap.core.EventLock").set({
152+
createdAt: new Date(Date.now() - 5 * 60 * 1000).toISOString(),
153+
})
154+
);
155+
156+
await runner._._singleInstanceAndMultiTenancy();
157+
expect(acquireLockSpy).toHaveBeenCalledTimes(12);
158+
expect(eventQueueRunnerSpy).toHaveBeenCalledTimes(6);
98159
});
99160
});

0 commit comments

Comments
 (0)