diff --git a/CHANGELOG.md b/CHANGELOG.md index 68d0f500..356bbfcc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## v1.9.2 - 2025-03-XX +### Added + +- propagate W3TraceContext from event creation to event-processing + ### Changed - Updated cron-parser to major version ^5.0.0 diff --git a/db/Event.cds b/db/Event.cds index 9cf2a771..cb3680ac 100644 --- a/db/Event.cds +++ b/db/Event.cds @@ -23,4 +23,6 @@ entity Event: cuid { lastAttemptTimestamp: Timestamp; createdAt: Timestamp @cds.on.insert : $now; startAfter: Timestamp; + context: LargeString; + error: String; } diff --git a/docs/configure-event/index.md b/docs/configure-event/index.md index 690f93fb..9a372ac1 100644 --- a/docs/configure-event/index.md +++ b/docs/configure-event/index.md @@ -137,6 +137,7 @@ instance is overloaded. | appNames | Specifies the application names on which the event should be processed. The application name is extracted from the environment variable `VCAP_APPLICATION`. If not defined, the event is processed on all connected applications. | null | | appInstances | Specifies the application instance numbers on which the event should be processed. The instance number is extracted from the environment variable `CF_INSTANCE_INDEX`. If not defined, the event is processed on all instances of the connected applications. | null | | keepAliveInterval | Specifies the interval (in seconds) at which keep-alive signals are sent during event processing to monitor system health. | 60 | +| inheritTraceContext | Determines whether the trace context is propagated during event publishing and processing. If set to `false`, trace context propagation is disabled for the event. | true | ## Configuration diff --git a/docs/implement-event/index.md b/docs/implement-event/index.md index c8800d8e..bf6b7d6c 100644 --- a/docs/implement-event/index.md +++ b/docs/implement-event/index.md @@ -1,7 +1,7 @@ --- layout: default title: Implement Event -nav_order: 8 +nav_order: 9 --- diff --git a/docs/load-balancing/index.md b/docs/load-balancing/index.md index 00ac2ef1..d4ac8a44 100644 --- a/docs/load-balancing/index.md +++ b/docs/load-balancing/index.md @@ -1,7 +1,7 @@ --- layout: default title: Load Balancing -nav_order: 6 +nav_order: 7 --- diff --git a/docs/publish-event/index.md b/docs/publish-event/index.md index 45ef5c19..3a803d77 100644 --- a/docs/publish-event/index.md +++ b/docs/publish-event/index.md @@ -1,7 +1,7 @@ --- layout: default title: Publishing of Events -nav_order: 7 +nav_order: 8 --- diff --git a/docs/setup/index.md b/docs/setup/index.md index 20737a76..5852a32b 100644 --- a/docs/setup/index.md +++ b/docs/setup/index.md @@ -14,7 +14,7 @@ nav_order: 3 - TOC -{: toc} + {: toc} @@ -80,7 +80,7 @@ The table includes the parameter name, a description of its purpose, and the def | cleanupLocksAndEventsForDev | Deletes all semantic locks and sets all events that are in progress to error during server start. This is used to clean up leftovers from server crashes or restarts during processing. | false | no | | redisOptions | The option is provided to customize settings when creating Redis clients. The object is spread at the root level for creating a client and within the `default` options for cluster clients. | {} | no | | insertEventsBeforeCommit | If enabled, this feature allows events (including those for outboxed services) to be inserted in bulk using the before commit handler. This is performed to improve performance by mass inserting events instead of single insert operations. This can be disabled by the parameter `skipInsertEventsBeforeCommit` in the function publishEvent. | false | yes | -| enableCAPTelemetry | If enabled in combination with `cap-js/telemetry`, OpenTelemetry traces about all event-queue activities are written using the `cap-js/telemetry` tracer. | false | yes | +| enableTelemetry | If enabled, OpenTelemetry traces for all event-queue activities are written. An OpenTelemetry exporter must be configured. | true | yes | | cronTimezone | Determines whether to apply the central `cronTimezone` setting for scheduling events. If set to `true` and the property `utc` is not enabled for the given event, it will use the defined `cronTimezone`. If set to `false`, the event will use UTC or the server's local time, based on the `utc` setting. | null | yes | | publishEventBlockList | Determines whether the publication of events to all app instances is enabled when Redis is active. If set to true, events can be published; if set to false, the publication is disabled. | true | yes | | crashOnRedisUnavailable | If enabled, the application will crash if Redis is unavailable during the connection check. | false | false | diff --git a/docs/status-handling/index.md b/docs/status-handling/index.md index 20020ae3..00a1d155 100644 --- a/docs/status-handling/index.md +++ b/docs/status-handling/index.md @@ -1,7 +1,7 @@ --- layout: default title: Event Status Handling -nav_order: 10 +nav_order: 11 --- diff --git a/docs/telemetry/img_1.png b/docs/telemetry/img_1.png new file mode 100644 index 00000000..4aef9dc0 Binary files /dev/null and b/docs/telemetry/img_1.png differ diff --git a/docs/telemetry/index.md b/docs/telemetry/index.md new file mode 100644 index 00000000..9f2d3090 --- /dev/null +++ b/docs/telemetry/index.md @@ -0,0 +1,97 @@ +--- +layout: default +title: Telemetry +nav_order: 6 +--- + + + +{: .no_toc} + +# Telemetry Insights + + + +- TOC +{: toc} + + + +The `@cap-js-community/event-queue` module comes with built-in support for OpenTelemetry, enabling seamless tracing and +observability of event processing. With this integration, you can track events from publication to execution, ensuring +full visibility into your system’s event flow. + +## Key Features + +- **Automatic Event Tracing** – Captures OpenTelemetry traces for event execution, helping you analyze processing times +- and dependencies. +- **Trace Context Propagation** – Preserves the OpenTelemetry tracing context when events are published and processed, +- maintaining a clear linkage across distributed systems. +- **End-to-End Monitoring** – Ensures that traceability is maintained from the moment an event is created until it is +- fully processed. + +## How It Works + +1. **Publishing an Event** + + - When an event is published, the current OpenTelemetry trace context is extracted. + - If no trace context is present, a new one is automatically created. + - The trace context is then attached to the event metadata. + +2. **Processing an Event** + - Upon processing, the previously stored trace context is retrieved. + - If no trace context exists, a new one is generated to ensure traceability. + - Periodic Events never have a existing trace context + - The trace context is injected back into the OpenTelemetry framework, maintaining continuity. + +## Benefits + +- **Comprehensive Observability** – Gain insight into event-driven workflows across different services. +- **Easier Debugging** – Quickly pinpoint performance issues and failure points. +- **Seamless Integration** – Compatible with OpenTelemetry-based monitoring tools with minimal setup. + +## Configuration + +By default, OpenTelemetry tracing is enabled if an OpenTelemetry exporter is set up or if Dynatrace OneAgent is +configured to export traces. However, the OpenTelemetry API must always be installed in the project. + +### Trace Context Propagation + +The propagation of the trace context can be controlled on an event level using the parameter `inheritTraceContext`. +By default, this is set to `true`, meaning the trace context will be inherited and propagated during event publishing +and processing. If set to `false`, the trace context propagation will be disabled for that particular event. + +### Disabling Telemetry + +Complete telemetry support can be disabled globally with the initialization parameter `enableTelemetry`. The default +value is `true`, meaning telemetry is enabled. If set to `false`, all telemetry features, including OpenTelemetry +tracing, will be disabled for the event-queue. + +For more advanced configurations, refer to the OpenTelemetry documentation on context propagation and trace exporters. +This integration also works smoothly with `@cap-js/telemetry`, meaning that if `@cap-js/telemetry` is configured, trace +exporting works out of the box with no additional setup. + +## Pitfalls with OpenTelemetry Tracing and Dynatrace OneAgent + +When using Dynatrace OneAgent for trace exporting without a separate OpenTelemetry exporter, there are some important +limitations to be aware of: + +- **Dependency on Dynatrace Trace Context** + Dynatrace OneAgent only exports traces to Dynatrace if they contain a valid Dynatrace trace context. This context is + automatically created when an event is published within the scope of an HTTP request or another operation captured by + Dynatrace OneAgent. + +- **Issues with Periodic and Standalone Events** + Events that are triggered periodically (e.g., scheduled jobs) or published without an existing Dynatrace trace context + may not be visible in Dynatrace. Since these events lack the necessary Dynatrace trace context, OneAgent does not + export them unless a separate OpenTelemetry exporter is configured. + +### Recommendation + +To ensure visibility of all traces—including periodic events or events outside of a Dynatrace-traced request—configure +a dedicated OpenTelemetry exporter alongside Dynatrace OneAgent. This guarantees that traces are properly exported even +when a Dynatrace trace context is missing. + +## Example Trace + +![Example Trace](img_1.png) diff --git a/docs/transaction-handling/index.md b/docs/transaction-handling/index.md index 8190ac71..cb1be215 100644 --- a/docs/transaction-handling/index.md +++ b/docs/transaction-handling/index.md @@ -1,7 +1,7 @@ --- layout: default title: Transactional Handling -nav_order: 9 +nav_order: 10 --- diff --git a/docs/unit-testing/index.md b/docs/unit-testing/index.md index b79dde8d..9ad7c006 100644 --- a/docs/unit-testing/index.md +++ b/docs/unit-testing/index.md @@ -1,7 +1,7 @@ --- layout: default title: Unit Testing -nav_order: 11 +nav_order: 12 --- diff --git a/package-lock.json b/package-lock.json index f1c8290f..68d8a2cf 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@cap-js-community/event-queue", - "version": "1.9.1", + "version": "1.9.2", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@cap-js-community/event-queue", - "version": "1.9.1", + "version": "1.9.2", "license": "Apache-2.0", "dependencies": { "@sap/xssec": "^4.2.4", @@ -19,6 +19,7 @@ "@cap-js/cds-test": "^0.2.0", "@cap-js/hana": "^1.7.0", "@cap-js/sqlite": "^1.9.0", + "@opentelemetry/api": "^1.9.0", "@sap/cds": "^8.8.0", "@sap/cds-dk": "^8.8.0", "eslint": "^8.57.0", @@ -636,9 +637,9 @@ } }, "node_modules/@eslint-community/eslint-utils": { - "version": "4.5.0", - "resolved": "https://registry.npmjs.org/@eslint-community/eslint-utils/-/eslint-utils-4.5.0.tgz", - "integrity": "sha512-RoV8Xs9eNwiDvhv7M+xcL4PWyRyIXRY/FLp3buU4h1EYfdF7unWUy3dOjPqb3C7rMUewIcqwW850PgS8h1o1yg==", + "version": "4.5.1", + "resolved": "https://registry.npmjs.org/@eslint-community/eslint-utils/-/eslint-utils-4.5.1.tgz", + "integrity": "sha512-soEIOALTfTK6EjmKMMoLugwaP0rzkad90iIWd1hMO9ARkSAyjfMfkRRhLvD5qH7vvM0Cg72pieUfR6yh6XxC4w==", "dev": true, "license": "MIT", "dependencies": { @@ -1279,6 +1280,16 @@ "node": ">=10" } }, + "node_modules/@opentelemetry/api": { + "version": "1.9.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/api/-/api-1.9.0.tgz", + "integrity": "sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==", + "dev": true, + "license": "Apache-2.0", + "engines": { + "node": ">=8.0.0" + } + }, "node_modules/@redis/bloom": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/@redis/bloom/-/bloom-1.2.0.tgz", @@ -1345,9 +1356,9 @@ } }, "node_modules/@sap/cds": { - "version": "8.8.1", - "resolved": "https://registry.npmjs.org/@sap/cds/-/cds-8.8.1.tgz", - "integrity": "sha512-9f0ELZMJ6jdXz1drej2zP39B9IXhs29AYo2iz/g+oDjKomwCkdL2I3Ec2aVfi13dG3fOq8ZcIHnEHnMQn2cm2A==", + "version": "8.8.2", + "resolved": "https://registry.npmjs.org/@sap/cds/-/cds-8.8.2.tgz", + "integrity": "sha512-KfFXSxf2pVLBKZr9Che8FYJ/VUS/UNngWfSHkGqVPPPjpSOHcW/x/h8x8Aiqk6Ze8WcZIl/2PgyKW4809xPDsQ==", "dev": true, "license": "SEE LICENSE IN LICENSE", "dependencies": { @@ -1396,9 +1407,9 @@ } }, "node_modules/@sap/cds-dk": { - "version": "8.8.0", - "resolved": "https://registry.npmjs.org/@sap/cds-dk/-/cds-dk-8.8.0.tgz", - "integrity": "sha512-61KqUNAbNaaXz7LHfhTBhIryaUDIj1iW9f/XHCbtc+AV8G2O5JcvBUEwf2ROYtvbXlKlZsIe4QewyUP6S78/8w==", + "version": "8.8.1", + "resolved": "https://registry.npmjs.org/@sap/cds-dk/-/cds-dk-8.8.1.tgz", + "integrity": "sha512-VBBnDA25j2Ej8q2oe/ElGCv6j9K8UWFsUrl/6iNTOw1MLORg/JSjnAn0bBqrRHezG1LIDrkJ6MoFXmMO2VPePQ==", "dev": true, "hasShrinkwrap": true, "license": "SEE LICENSE IN LICENSE", @@ -1430,7 +1441,7 @@ } }, "node_modules/@sap/cds-dk/node_modules/@cap-js/asyncapi": { - "version": "1.0.2", + "version": "1.0.3", "dev": true, "license": "SEE LICENSE IN LICENSE", "peerDependencies": { @@ -1438,7 +1449,7 @@ } }, "node_modules/@sap/cds-dk/node_modules/@cap-js/db-service": { - "version": "1.17.2", + "version": "1.18.0", "dev": true, "license": "SEE LICENSE", "optional": true, @@ -1450,7 +1461,7 @@ } }, "node_modules/@sap/cds-dk/node_modules/@cap-js/openapi": { - "version": "1.2.0", + "version": "1.2.1", "dev": true, "license": "SEE LICENSE IN LICENSE", "dependencies": { @@ -1461,12 +1472,12 @@ } }, "node_modules/@sap/cds-dk/node_modules/@cap-js/sqlite": { - "version": "1.8.0", + "version": "1.9.0", "dev": true, "license": "SEE LICENSE", "optional": true, "dependencies": { - "@cap-js/db-service": "^1.17.0", + "@cap-js/db-service": "^1.18.0", "better-sqlite3": "^11.0.0" }, "peerDependencies": { @@ -1474,7 +1485,7 @@ } }, "node_modules/@sap/cds-dk/node_modules/@eslint-community/eslint-utils": { - "version": "4.4.1", + "version": "4.5.1", "dev": true, "license": "MIT", "dependencies": { @@ -1522,6 +1533,14 @@ "node": "^18.18.0 || ^20.9.0 || >=21.1.0" } }, + "node_modules/@sap/cds-dk/node_modules/@eslint/config-helpers": { + "version": "0.1.0", + "dev": true, + "license": "Apache-2.0", + "engines": { + "node": "^18.18.0 || ^20.9.0 || >=21.1.0" + } + }, "node_modules/@sap/cds-dk/node_modules/@eslint/core": { "version": "0.12.0", "dev": true, @@ -1556,7 +1575,7 @@ } }, "node_modules/@sap/cds-dk/node_modules/@eslint/js": { - "version": "9.21.0", + "version": "9.22.0", "dev": true, "license": "MIT", "engines": { @@ -1640,7 +1659,7 @@ } }, "node_modules/@sap/cds-dk/node_modules/@sap/cds": { - "version": "8.8.0", + "version": "8.8.2", "dev": true, "license": "SEE LICENSE IN LICENSE", "dependencies": { @@ -1671,7 +1690,7 @@ } }, "node_modules/@sap/cds-dk/node_modules/@sap/cds-compiler": { - "version": "5.8.0", + "version": "5.8.2", "dev": true, "license": "SEE LICENSE IN LICENSE", "dependencies": { @@ -1710,7 +1729,7 @@ } }, "node_modules/@sap/cds-dk/node_modules/@sap/cds-mtxs": { - "version": "2.6.0", + "version": "2.6.1", "dev": true, "license": "SEE LICENSE IN LICENSE", "dependencies": { @@ -1789,11 +1808,11 @@ } }, "node_modules/@sap/cds-dk/node_modules/@sap/xsenv": { - "version": "5.4.0", + "version": "5.5.0", "dev": true, "license": "SEE LICENSE IN LICENSE file", "dependencies": { - "debug": "4.3.7", + "debug": "4.4.0", "node-cache": "^5.1.2", "verror": "1.10.1" }, @@ -1824,7 +1843,7 @@ } }, "node_modules/@sap/cds-dk/node_modules/acorn": { - "version": "8.14.0", + "version": "8.14.1", "dev": true, "license": "MIT", "bin": { @@ -1908,7 +1927,7 @@ "license": "MIT" }, "node_modules/@sap/cds-dk/node_modules/axios": { - "version": "1.8.1", + "version": "1.8.3", "dev": true, "license": "MIT", "dependencies": { @@ -2086,12 +2105,12 @@ } }, "node_modules/@sap/cds-dk/node_modules/call-bound": { - "version": "1.0.3", + "version": "1.0.4", "dev": true, "license": "MIT", "dependencies": { - "call-bind-apply-helpers": "^1.0.1", - "get-intrinsic": "^1.2.6" + "call-bind-apply-helpers": "^1.0.2", + "get-intrinsic": "^1.3.0" }, "engines": { "node": ">= 0.4" @@ -2220,7 +2239,7 @@ } }, "node_modules/@sap/cds-dk/node_modules/debug": { - "version": "4.3.7", + "version": "4.4.0", "dev": true, "license": "MIT", "dependencies": { @@ -2402,16 +2421,17 @@ } }, "node_modules/@sap/cds-dk/node_modules/eslint": { - "version": "9.21.0", + "version": "9.22.0", "dev": true, "license": "MIT", "dependencies": { "@eslint-community/eslint-utils": "^4.2.0", "@eslint-community/regexpp": "^4.12.1", "@eslint/config-array": "^0.19.2", + "@eslint/config-helpers": "^0.1.0", "@eslint/core": "^0.12.0", "@eslint/eslintrc": "^3.3.0", - "@eslint/js": "9.21.0", + "@eslint/js": "9.22.0", "@eslint/plugin-kit": "^0.2.7", "@humanfs/node": "^0.16.6", "@humanwhocodes/module-importer": "^1.0.1", @@ -2423,7 +2443,7 @@ "cross-spawn": "^7.0.6", "debug": "^4.3.2", "escape-string-regexp": "^4.0.0", - "eslint-scope": "^8.2.0", + "eslint-scope": "^8.3.0", "eslint-visitor-keys": "^4.2.0", "espree": "^10.3.0", "esquery": "^1.5.0", @@ -2460,7 +2480,7 @@ } }, "node_modules/@sap/cds-dk/node_modules/eslint-scope": { - "version": "8.2.0", + "version": "8.3.0", "dev": true, "license": "BSD-2-Clause", "dependencies": { @@ -4896,9 +4916,9 @@ "license": "MIT" }, "node_modules/better-sqlite3": { - "version": "11.8.1", - "resolved": "https://registry.npmjs.org/better-sqlite3/-/better-sqlite3-11.8.1.tgz", - "integrity": "sha512-9BxNaBkblMjhJW8sMRZxnxVTRgbRmssZW0Oxc1MPBTfiR+WW21e2Mk4qu8CzrcZb1LwPCnFsfDEzq+SNcBU8eg==", + "version": "11.9.1", + "resolved": "https://registry.npmjs.org/better-sqlite3/-/better-sqlite3-11.9.1.tgz", + "integrity": "sha512-Ba0KR+Fzxh2jDRhdg6TSH0SJGzb8C0aBY4hR8w8madIdIzzC6Y1+kx5qR6eS1Z+Gy20h6ZU28aeyg0z1VIrShQ==", "dev": true, "hasInstallScript": true, "license": "MIT", @@ -5199,9 +5219,9 @@ } }, "node_modules/caniuse-lite": { - "version": "1.0.30001703", - "resolved": "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001703.tgz", - "integrity": "sha512-kRlAGTRWgPsOj7oARC9m1okJEXdL/8fekFVcxA8Hl7GH4r/sN4OJn/i6Flde373T50KS7Y37oFbMwlE8+F42kQ==", + "version": "1.0.30001706", + "resolved": "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001706.tgz", + "integrity": "sha512-3ZczoTApMAZwPKYWmwVbQMFpXBDds3/0VciVoUwPUbldlYyVLmRVuRs/PcUZtHpbLRpzzDvrvnFuREsGt6lUug==", "dev": true, "funding": [ { @@ -5522,9 +5542,9 @@ } }, "node_modules/cron-parser": { - "version": "5.0.4", - "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-5.0.4.tgz", - "integrity": "sha512-ud6L7uGE4a7QxKndq106+99iKMlhG1/gSVlA4SH7qy3xO/R/EUoXFTJFMBOogdk00mZiXPONRI4wFKAcymKp6w==", + "version": "5.0.5", + "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-5.0.5.tgz", + "integrity": "sha512-3vEcsi2qCEG/bHnOjHJkOg8ms3YBlOHSz4XPq45P9wu9XxZWoLIpXiqCCWg8yntrGZkQZ1j6DRj1fFYpB3dHIA==", "license": "MIT", "dependencies": { "luxon": "^3.5.0" @@ -5741,9 +5761,9 @@ "license": "MIT" }, "node_modules/electron-to-chromium": { - "version": "1.5.114", - "resolved": "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.5.114.tgz", - "integrity": "sha512-DFptFef3iktoKlFQK/afbo274/XNWD00Am0xa7M8FZUepHlHT8PEuiNBoRfFHbH1okqN58AlhbJ4QTkcnXorjA==", + "version": "1.5.120", + "resolved": "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.5.120.tgz", + "integrity": "sha512-oTUp3gfX1gZI+xfD2djr2rzQdHCwHzPQrrK0CD7WpTdF0nPdQ/INcRVjWgLdCT4a9W3jFObR9DAfsuyFQnI8CQ==", "dev": true, "license": "ISC" }, diff --git a/package.json b/package.json index 52611f94..a5ee8d59 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@cap-js-community/event-queue", - "version": "1.9.1", + "version": "1.9.2", "description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.", "main": "src/index.js", "types": "src/index.d.ts", @@ -63,7 +63,8 @@ "hdb": "^0.19.10", "jest": "^29.7.0", "prettier": "^2.8.8", - "sqlite3": "^5.1.7" + "sqlite3": "^5.1.7", + "@opentelemetry/api": "^1.9.0" }, "homepage": "https://cap-js-community.github.io/event-queue/", "repository": { diff --git a/src/EventQueueProcessorBase.js b/src/EventQueueProcessorBase.js index 426440df..b460415e 100644 --- a/src/EventQueueProcessorBase.js +++ b/src/EventQueueProcessorBase.js @@ -11,7 +11,7 @@ const { arrayToFlatMap } = require("./shared/common"); const eventScheduler = require("./shared/eventScheduler"); const eventConfig = require("./config"); const PerformanceTracer = require("./shared/PerformanceTracer"); -const trace = require("./shared/openTelemetry"); +const { trace } = require("./shared/openTelemetry"); const SetIntervalDriftSafe = require("./shared/SetIntervalDriftSafe"); const IMPLEMENT_ERROR_MESSAGE = "needs to be reimplemented"; @@ -330,6 +330,11 @@ class EventQueueProcessorBase { } catch { /* empty */ } + try { + queueEntry.context = JSON.parse(queueEntry.context); + } catch { + /* empty */ + } } #determineAndAddEventStatusToMap(id, processingStatus, statusMap = this.__statusMap) { @@ -1263,6 +1268,10 @@ class EventQueueProcessorBase { set lockAcquiredTime(value) { this.#eventConfig.lockAcquiredTime = value; } + + get inheritTraceContext() { + return this.#eventConfig.inheritTraceContext; + } } module.exports = EventQueueProcessorBase; diff --git a/src/config.js b/src/config.js index 09c3ea73..eb47ba08 100644 --- a/src/config.js +++ b/src/config.js @@ -20,6 +20,7 @@ const DEFAULT_PRIORITY = Priorities.Medium; const DEFAULT_INCREASE_PRIORITY = true; const DEFAULT_KEEP_ALIVE_INTERVAL = 60; const DEFAULT_MAX_FACTOR_STUCK_2_KEEP_ALIVE_INTERVAL = 3.5; +const DEFAULT_INHERIT_TRACE_CONTEXT = true; const SUFFIX_PERIODIC = "_PERIODIC"; const COMMAND_BLOCK = "EVENT_QUEUE_EVENT_BLOCK"; const COMMAND_UNBLOCK = "EVENT_QUEUE_EVENT_UNBLOCK"; @@ -62,7 +63,7 @@ class Config { #cleanupLocksAndEventsForDev; #redisOptions; #insertEventsBeforeCommit; - #enableCAPTelemetry; + #enableTelemetry; #unsubscribeHandlers = []; #unsubscribedTenants = {}; #cronTimezone; @@ -313,6 +314,7 @@ class Config { multiInstanceProcessing: config.multiInstanceProcessing, increasePriorityOverTime: config.increasePriorityOverTime, keepAliveInterval: config.keepAliveInterval, + inheritTraceContext: true, internalEvent: true, }; @@ -512,6 +514,7 @@ class Config { if (this.isMultiTenancy && event.multiInstanceProcessing) { throw EventQueueError.multiInstanceProcessingNotAllowed(event.type, event.subType); } + event.inheritTraceContext = event.inheritTraceContext ?? DEFAULT_INHERIT_TRACE_CONTEXT; this.#basicEventValidation(event); } @@ -759,12 +762,12 @@ class Config { return this.#insertEventsBeforeCommit; } - set enableCAPTelemetry(value) { - this.#enableCAPTelemetry = value; + set enableTelemetry(value) { + this.#enableTelemetry = value; } - get enableCAPTelemetry() { - return this.#enableCAPTelemetry; + get enableTelemetry() { + return this.#enableTelemetry; } get isMultiTenancy() { diff --git a/src/index.d.ts b/src/index.d.ts index 03c1c33a..40540cd3 100644 --- a/src/index.d.ts +++ b/src/index.d.ts @@ -109,6 +109,7 @@ interface EventEntityPublish { referenceEntity?: string; referenceEntityKey?: string; payload?: string; + startAfter?: string; } interface EventTriggerProcessing { @@ -146,6 +147,7 @@ export declare class EventQueueProcessorBase { shouldRollbackTransaction(key: string): boolean; beforeProcessingEvents(): Promise; addEntryToProcessingMap(key: string, queueEntry: EventEntity, payload: Object): void; + getTxForEventProcessing(key: string): cds.Transaction; set logger(value: CdsLogger); get logger(): CdsLogger; @@ -163,6 +165,7 @@ export function publishEvent( options?: { skipBroadcast?: boolean; skipInsertEventsBeforeCommit?: boolean; + addTraceContext?: boolean; } ): Promise; @@ -252,8 +255,8 @@ declare class Config { get redisOptions(): any; set insertEventsBeforeCommit(value: any); get insertEventsBeforeCommit(): any; - set enableCAPTelemetry(value: any); - get enableCAPTelemetry(): any; + set enableTelemetry(value: any); + get enableTelemetry(): any; get isMultiTenancy(): boolean; } diff --git a/src/initialize.js b/src/initialize.js index f12c3b80..218938b5 100644 --- a/src/initialize.js +++ b/src/initialize.js @@ -40,7 +40,7 @@ const CONFIG_VARS = [ ["cleanupLocksAndEventsForDev", false], ["redisOptions", {}], ["insertEventsBeforeCommit", true], - ["enableCAPTelemetry", false], + ["enableTelemetry", true], ["cronTimezone", null], ["publishEventBlockList", true], ["crashOnRedisUnavailable", false], @@ -65,7 +65,7 @@ const CONFIG_VARS = [ * @param {boolean} [options.cleanupLocksAndEventsForDev=false] - Cleanup locks and events for development environments. * @param {Object} [options.redisOptions={}] - Configuration options for Redis. * @param {boolean} [options.insertEventsBeforeCommit=true] - Insert events into the queue before committing the transaction. - * @param {boolean} [options.enableCAPTelemetry=false] - Enable telemetry for CAP. + * @param {boolean} [options.enableTelemetry=false] - Enable telemetry for CAP. * @param {string} [options.cronTimezone=null] - Default timezone for cron jobs. * @param {string} [options.publishEventBlockList=true] - If redis is available event blocklist is distributed to all application instances * @param {string} [options.crashOnRedisUnavailable=true] - If enabled an error is thrown if the redis connection check is not successful diff --git a/src/processEventQueue.js b/src/processEventQueue.js index 14eba4f6..b374d0ae 100644 --- a/src/processEventQueue.js +++ b/src/processEventQueue.js @@ -9,7 +9,7 @@ const { TransactionMode, EventProcessingStatus } = require("./constants"); const { limiter } = require("./shared/common"); const { executeInNewTransaction } = require("./shared/cdsHelper"); -const trace = require("./shared/openTelemetry"); +const { trace } = require("./shared/openTelemetry"); const COMPONENT_NAME = "/eventQueue/processEventQueue"; @@ -78,22 +78,20 @@ const processEventQueue = async (context, eventType, eventSubType) => { if (Object.keys(eventTypeInstance.queueEntriesWithPayloadMap).length) { await executeInNewTransaction(context, `eventQueue-processing-${eventType}##${eventSubType}`, async (tx) => { eventTypeInstance.processEventContext = tx.context; - await trace(eventTypeInstance.context, "process-events", async () => { - try { - eventTypeInstance.clusterQueueEntries(eventTypeInstance.queueEntriesWithPayloadMap); - await processEventMap(eventTypeInstance); - } catch (err) { - eventTypeInstance.handleErrorDuringClustering(err); - } - if ( - eventTypeInstance.transactionMode !== TransactionMode.alwaysCommit || - Object.entries(eventTypeInstance.eventProcessingMap).some(([key]) => - eventTypeInstance.shouldRollbackTransaction(key) - ) - ) { - await tx.rollback(); - } - }); + try { + eventTypeInstance.clusterQueueEntries(eventTypeInstance.queueEntriesWithPayloadMap); + await processEventMap(eventTypeInstance); + } catch (err) { + eventTypeInstance.handleErrorDuringClustering(err); + } + if ( + eventTypeInstance.transactionMode !== TransactionMode.alwaysCommit || + Object.entries(eventTypeInstance.eventProcessingMap).some(([key]) => + eventTypeInstance.shouldRollbackTransaction(key) + ) + ) { + await tx.rollback(); + } }); } await executeInNewTransaction(context, `eventQueue-persistStatus-${eventType}##${eventSubType}`, async (tx) => { @@ -319,18 +317,30 @@ const _checkEventIsBlocked = async (baseInstance) => { }; const _processEvent = async (eventTypeInstance, processContext, key, queueEntries, payload) => { - try { - const eventOutdated = await eventTypeInstance.isOutdatedAndKeepAlive(queueEntries); - if (eventOutdated) { - // NOTE: return empty status map to comply with the interface - return {}; - } - eventTypeInstance.setTxForEventProcessing(key, cds.tx(processContext)); - const statusTuple = await eventTypeInstance.processEvent(processContext, key, queueEntries, payload); - return eventTypeInstance.setEventStatus(queueEntries, statusTuple); - } catch (err) { - return eventTypeInstance.handleErrorDuringProcessing(err, queueEntries); + let traceContext; + if (queueEntries.length === 1 && eventTypeInstance.inheritTraceContext) { + traceContext = queueEntries[0].context?.traceContext; } + + return await trace( + eventTypeInstance.baseContext, + `process-event-${eventTypeInstance.eventType}-${eventTypeInstance.eventSubType}`, + async () => { + try { + const eventOutdated = await eventTypeInstance.isOutdatedAndKeepAlive(queueEntries); + if (eventOutdated) { + // NOTE: return empty status map to comply with the interface + return {}; + } + eventTypeInstance.setTxForEventProcessing(key, cds.tx(processContext)); + const statusTuple = await eventTypeInstance.processEvent(processContext, key, queueEntries, payload); + return eventTypeInstance.setEventStatus(queueEntries, statusTuple); + } catch (err) { + return eventTypeInstance.handleErrorDuringProcessing(err, queueEntries); + } + }, + { traceContext } + ); }; const resilientRequire = async (eventConfig) => { diff --git a/src/publishEvent.js b/src/publishEvent.js index a2c0afe5..9c2897ef 100644 --- a/src/publishEvent.js +++ b/src/publishEvent.js @@ -3,6 +3,7 @@ const config = require("./config"); const common = require("./shared/common"); const EventQueueError = require("./EventQueueError"); +const openTelemetry = require("./shared/openTelemetry"); /** * Asynchronously publishes a series of events to the event queue. @@ -29,7 +30,11 @@ const EventQueueError = require("./EventQueueError"); * @throws {EventQueueError} Throws an error if the startAfter field is not a valid date. * @returns {Promise<*>} Returns a promise which resolves to the result of the database insert operation. */ -const publishEvent = async (tx, events, { skipBroadcast = false, skipInsertEventsBeforeCommit = false } = {}) => { +const publishEvent = async ( + tx, + events, + { skipBroadcast = false, skipInsertEventsBeforeCommit = false, addTraceContext = true } = {} +) => { if (!config.initialized) { throw EventQueueError.notInitialized(); } @@ -51,6 +56,10 @@ const publishEvent = async (tx, events, { skipBroadcast = false, skipInsertEvent if (typeof event.payload !== "string") { event.payload = JSON.stringify(event.payload); } + + if (addTraceContext) { + event.context = JSON.stringify({ traceContext: openTelemetry.getCurrentTraceContext() }); + } } if (config.insertEventsBeforeCommit && !skipInsertEventsBeforeCommit) { _registerHandlerAndAddEvents(tx, events); diff --git a/src/redis/redisPub.js b/src/redis/redisPub.js index 6b216982..508dc6bb 100644 --- a/src/redis/redisPub.js +++ b/src/redis/redisPub.js @@ -9,7 +9,7 @@ const distributedLock = require("../shared/distributedLock"); const config = require("../config"); const common = require("../shared/common"); const { runEventCombinationForTenant } = require("../runner/runnerHelper"); -const trace = require("../shared/openTelemetry"); +const { trace } = require("../shared/openTelemetry"); const { TenantIdCheckTypes } = require("../constants"); const EVENT_MESSAGE_CHANNEL = "EVENT_QUEUE_MESSAGE_CHANNEL"; diff --git a/src/runner/runner.js b/src/runner/runner.js index 5106da3c..a63a8c78 100644 --- a/src/runner/runner.js +++ b/src/runner/runner.js @@ -15,7 +15,7 @@ const config = require("../config"); const redisPub = require("../redis/redisPub"); const openEvents = require("./openEvents"); const { runEventCombinationForTenant } = require("./runnerHelper"); -const trace = require("../shared/openTelemetry"); +const { trace } = require("../shared/openTelemetry"); const COMPONENT_NAME = "/eventQueue/runner"; const EVENT_QUEUE_RUN_ID = "EVENT_QUEUE_RUN_ID"; diff --git a/src/runner/runnerHelper.js b/src/runner/runnerHelper.js index c9bcd0c0..81790378 100644 --- a/src/runner/runnerHelper.js +++ b/src/runner/runnerHelper.js @@ -8,7 +8,7 @@ const { processEventQueue } = require("../processEventQueue"); const eventQueueConfig = require("../config"); const WorkerQueue = require("../shared/WorkerQueue"); const distributedLock = require("../shared/distributedLock"); -const trace = require("../shared/openTelemetry"); +const { trace } = require("../shared/openTelemetry"); const COMPONENT_NAME = "/eventQueue/runnerHelper"; diff --git a/src/shared/openTelemetry.js b/src/shared/openTelemetry.js index 5d2deff5..362f2fd9 100644 --- a/src/shared/openTelemetry.js +++ b/src/shared/openTelemetry.js @@ -1,32 +1,53 @@ "use strict"; +const _resilientRequire = (module) => { + try { + return require(module); + } catch { + // ignore + } +}; + const cds = require("@sap/cds"); -let otel; -try { - otel = require("@opentelemetry/api"); -} catch { - // ignore -} +const otel = _resilientRequire("@opentelemetry/api"); const config = require("../config"); const COMPONENT_NAME = "/shared/openTelemetry"; -const trace = async (context, label, fn, { attributes = {}, newRootSpan = false } = {}) => { +const trace = async (context, label, fn, { attributes = {}, newRootSpan = false, traceContext } = {}) => { const tracerProvider = otel?.trace.getTracerProvider(); - // Check if a real provider is registered - if (!config.enableCAPTelemetry || !tracerProvider || tracerProvider === otel.trace.NOOP_TRACER_PROVIDER) { + // TODO: extend check to validate if DT oneagent is available AND active + if (!config.enableTelemetry || !tracerProvider || tracerProvider === otel.trace.NOOP_TRACER_PROVIDER) { return fn(); } - const tracer = otel.trace.getTracer("eventqueue"); - const span = tracer.startSpan(`eventqueue-${label}`, { - kind: otel.SpanKind.INTERNAL, - root: newRootSpan, - }); + const tracer = otel.trace.getTracer("@cap-js-community/event-queue"); + const extractedContext = traceContext + ? otel.propagation.extract(otel.context.active(), traceContext) + : otel.context.active(); + const span = tracer.startSpan( + `eventqueue-${label}`, + { + kind: otel.SpanKind.INTERNAL, + root: newRootSpan, + }, + extractedContext + ); _setAttributes(context, span, attributes); - const ctxWithSpan = otel.trace.setSpan(otel.context.active(), span); + const ctxWithSpan = otel.trace.setSpan(extractedContext, span); + + return await _startOtelTrace(ctxWithSpan, traceContext, span, fn); +}; + +const _startOtelTrace = async (ctxWithSpan, traceContext, span, fn) => { return otel.context.with(ctxWithSpan, async () => { + if (traceContext) { + cds.log("/eventQueue/telemetry").info("Linked span:", span.spanContext()); + const carrier = {}; + otel.propagation.inject(ctxWithSpan, carrier); + cds.log("/eventQueue/telemetry").info("Extracted trace context by inject", carrier); + } const onSuccess = (res) => { span.setStatus({ code: otel.SpanStatusCode.OK }); return res; @@ -72,4 +93,13 @@ const _setAttributes = (context, span, attributes) => { } }; -module.exports = trace; +const getCurrentTraceContext = () => { + if (!otel) { + return null; + } + const carrier = {}; + otel.propagation.inject(otel.context.active(), carrier); + return carrier; +}; + +module.exports = { trace, getCurrentTraceContext }; diff --git a/test/__snapshots__/eventQueueOutbox.test.js.snap b/test/__snapshots__/eventQueueOutbox.test.js.snap index 44d31ff0..7c7c8403 100644 --- a/test/__snapshots__/eventQueueOutbox.test.js.snap +++ b/test/__snapshots__/eventQueueOutbox.test.js.snap @@ -24,6 +24,7 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true custom options should win "deleteFinishedEventsAfterDays": undefined, "impl": "./outbox/EventQueueGenericOutboxHandler", "increasePriorityOverTime": true, + "inheritTraceContext": true, "internalEvent": true, "keepAliveInterval": 60000, "keepAliveMaxInProgressTime": 210000, @@ -54,6 +55,35 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true error in srv.after 1`] = ` ] `; +exports[`event-queue outbox monkeyPatchCAPOutbox=true map config to event-queue config 1`] = ` +{ + "_appInstancesMap": null, + "_appNameMap": null, + "appInstances": undefined, + "appNames": undefined, + "checkForNextChunk": undefined, + "deleteFinishedEventsAfterDays": undefined, + "impl": "./outbox/EventQueueGenericOutboxHandler", + "increasePriorityOverTime": true, + "inheritTraceContext": true, + "internalEvent": true, + "keepAliveInterval": 60000, + "keepAliveMaxInProgressTime": 210000, + "load": 1, + "multiInstanceProcessing": undefined, + "parallelEventProcessing": 5, + "priority": "medium", + "processAfterCommit": undefined, + "retryAttempts": 20, + "retryFailedAfter": undefined, + "selectMaxChunkSize": 100, + "subType": "NotificationService", + "transactionMode": undefined, + "type": "CAP_OUTBOX", + "useEventQueueUser": undefined, +} +`; + exports[`event-queue outbox monkeyPatchCAPOutbox=true req error should be caught for emit 1`] = ` [ [ @@ -175,3 +205,56 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true should store correct user "headers": {}, } `; + +exports[`event-queue outbox monkeyPatchCAPOutbox=true should work for outboxed services by require with transactionMode config 1`] = ` +{ + "_appInstancesMap": null, + "_appNameMap": null, + "appInstances": undefined, + "appNames": undefined, + "checkForNextChunk": undefined, + "deleteFinishedEventsAfterDays": undefined, + "impl": "./outbox/EventQueueGenericOutboxHandler", + "increasePriorityOverTime": true, + "inheritTraceContext": true, + "internalEvent": true, + "keepAliveInterval": 60000, + "keepAliveMaxInProgressTime": 210000, + "load": 1, + "multiInstanceProcessing": undefined, + "parallelEventProcessing": 5, + "priority": "medium", + "processAfterCommit": undefined, + "retryAttempts": 20, + "retryFailedAfter": undefined, + "selectMaxChunkSize": 100, + "subType": "NotificationServiceOutboxedByConfig", + "transactionMode": "alwaysRollback", + "type": "CAP_OUTBOX", + "useEventQueueUser": undefined, +} +`; + +exports[`event-queue outbox monkeyPatchCAPOutbox=true trace context should extract current trace context and save 1`] = ` +{ + "traceContext": { + "traceparent": "00-ac46cd732064b44a9c692c2062db8fbd-5fa4a29b5675b3c3-01", + }, +} +`; + +exports[`event-queue outbox monkeyPatchCAPOutbox=true trace context should not use stored trace context if disabled by config 1`] = ` +{ + "traceContext": { + "traceparent": "00-ac46cd732064b44a9c692c2062db8fbd-5fa4a29b5675b3c3-01", + }, +} +`; + +exports[`event-queue outbox monkeyPatchCAPOutbox=true trace context should use stored trace context in next processing 1`] = ` +{ + "traceContext": { + "traceparent": "00-ac46cd732064b44a9c692c2062db8fbd-5fa4a29b5675b3c3-01", + }, +} +`; diff --git a/test/__snapshots__/initialize.test.js.snap b/test/__snapshots__/initialize.test.js.snap index a8c49ede..f60093c4 100644 --- a/test/__snapshots__/initialize.test.js.snap +++ b/test/__snapshots__/initialize.test.js.snap @@ -7,6 +7,7 @@ exports[`initialize read yaml config file 1`] = ` "_appNameMap": null, "impl": "./srv/notification/EventQueueNotificationProcessor", "increasePriorityOverTime": true, + "inheritTraceContext": true, "keepAliveInterval": 60000, "keepAliveMaxInProgressTime": 210000, "load": 40, @@ -20,6 +21,7 @@ exports[`initialize read yaml config file 1`] = ` "_appNameMap": null, "impl": "./srv/businessLogs/EventQueueBusinessLogProcessor", "increasePriorityOverTime": true, + "inheritTraceContext": true, "keepAliveInterval": 60000, "keepAliveMaxInProgressTime": 210000, "load": 10, diff --git a/test/eventQueueOutbox.test.js b/test/eventQueueOutbox.test.js index b2d8e39a..6b80d8a8 100644 --- a/test/eventQueueOutbox.test.js +++ b/test/eventQueueOutbox.test.js @@ -1,5 +1,8 @@ "use strict"; +const otel = require("@opentelemetry/api"); +jest.mock("@opentelemetry/api", () => require("./mocks/openTelemetry")); + const cds = require("@sap/cds/lib"); const eventQueue = require("../src"); const path = require("path"); @@ -31,6 +34,7 @@ describe("event-queue outbox", () => { loggerMock = mockLogger(); }); beforeEach(async () => { + eventQueue.config.enableTelemetry = true; context = new cds.EventContext({ user: "testUser", tenant: 123 }); tx = cds.tx(context); await tx.run(DELETE.from("sap.eventqueue.Lock")); @@ -337,33 +341,7 @@ describe("event-queue outbox", () => { await commitAndOpenNew(); await testHelper.selectEventQueueAndExpectOpen(tx, { expectedLength: 1 }); const config = eventQueue.config.events.find((event) => event.subType === "NotificationService"); - expect(config).toMatchInlineSnapshot(` - { - "_appInstancesMap": null, - "_appNameMap": null, - "appInstances": undefined, - "appNames": undefined, - "checkForNextChunk": undefined, - "deleteFinishedEventsAfterDays": undefined, - "impl": "./outbox/EventQueueGenericOutboxHandler", - "increasePriorityOverTime": true, - "internalEvent": true, - "keepAliveInterval": 60000, - "keepAliveMaxInProgressTime": 210000, - "load": 1, - "multiInstanceProcessing": undefined, - "parallelEventProcessing": 5, - "priority": "medium", - "processAfterCommit": undefined, - "retryAttempts": 20, - "retryFailedAfter": undefined, - "selectMaxChunkSize": 100, - "subType": "NotificationService", - "transactionMode": undefined, - "type": "CAP_OUTBOX", - "useEventQueueUser": undefined, - } - `); + expect(config).toMatchSnapshot(); }); it("should work for outboxed services by require with transactionMode config", async () => { @@ -388,33 +366,7 @@ describe("event-queue outbox", () => { expect(loggerMock).sendFioriActionCalled(); const config = eventQueue.config.events.find((event) => event.subType === "NotificationServiceOutboxedByConfig"); delete config.startTime; - expect(config).toMatchInlineSnapshot(` - { - "_appInstancesMap": null, - "_appNameMap": null, - "appInstances": undefined, - "appNames": undefined, - "checkForNextChunk": undefined, - "deleteFinishedEventsAfterDays": undefined, - "impl": "./outbox/EventQueueGenericOutboxHandler", - "increasePriorityOverTime": true, - "internalEvent": true, - "keepAliveInterval": 60000, - "keepAliveMaxInProgressTime": 210000, - "load": 1, - "multiInstanceProcessing": undefined, - "parallelEventProcessing": 5, - "priority": "medium", - "processAfterCommit": undefined, - "retryAttempts": 20, - "retryFailedAfter": undefined, - "selectMaxChunkSize": 100, - "subType": "NotificationServiceOutboxedByConfig", - "transactionMode": "alwaysRollback", - "type": "CAP_OUTBOX", - "useEventQueueUser": undefined, - } - `); + expect(config).toMatchSnapshot(); expect(loggerMock.callsLengths().error).toEqual(0); }); @@ -756,6 +708,84 @@ describe("event-queue outbox", () => { expect(loggerMock.callsLengths().error).toEqual(0); }); }); + + describe("trace context", () => { + it("should extract current trace context and save", async () => { + jest.spyOn(otel.propagation, "inject").mockImplementationOnce((context, carrier) => { + carrier.traceparent = "00-ac46cd732064b44a9c692c2062db8fbd-5fa4a29b5675b3c3-01"; + }); + const service = await cds.connect.to("NotificationService"); + const outboxedService = cds.outboxed(service).tx(context); + await outboxedService.send("sendFiori", { + to: "to", + subject: "subject", + body: "body", + }); + await commitAndOpenNew(); + const [event] = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 1, + additionalColumns: ["context"], + }); + expect(JSON.parse(event.context)).toMatchSnapshot(); + expect(loggerMock).not.sendFioriActionCalled(); + }); + + it("should use stored trace context in next processing", async () => { + jest.spyOn(otel.propagation, "inject").mockImplementationOnce((context, carrier) => { + carrier.traceparent = "00-ac46cd732064b44a9c692c2062db8fbd-5fa4a29b5675b3c3-01"; + }); + const service = await cds.connect.to("NotificationService"); + const outboxedService = cds.outboxed(service).tx(context); + await outboxedService.send("sendFiori", { + to: "to", + subject: "subject", + body: "body", + }); + await commitAndOpenNew(); + const [event] = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 1, + additionalColumns: ["context"], + }); + expect(JSON.parse(event.context)).toMatchSnapshot(); + expect(loggerMock).not.sendFioriActionCalled(); + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + await commitAndOpenNew(); + await testHelper.selectEventQueueAndExpectDone(tx, { expectedLength: 1 }); + expect(loggerMock).sendFioriActionCalled(); + expect(loggerMock.callsLengths().error).toEqual(0); + expect(jest.spyOn(otel.propagation, "extract")).toHaveBeenCalledWith("mocked-context", { + traceparent: "00-ac46cd732064b44a9c692c2062db8fbd-5fa4a29b5675b3c3-01", + }); + }); + + it("should not use stored trace context if disabled by config", async () => { + jest.spyOn(otel.propagation, "inject").mockImplementationOnce((context, carrier) => { + carrier.traceparent = "00-ac46cd732064b44a9c692c2062db8fbd-5fa4a29b5675b3c3-01"; + }); + const service = await cds.connect.to("NotificationService"); + const outboxedService = cds.outboxed(service).tx(context); + await outboxedService.send("sendFiori", { + to: "to", + subject: "subject", + body: "body", + }); + await commitAndOpenNew(); + const [event] = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 1, + additionalColumns: ["context"], + }); + expect(JSON.parse(event.context)).toMatchSnapshot(); + expect(loggerMock).not.sendFioriActionCalled(); + eventQueue.config.events.find(({ subType }) => subType === "NotificationService").inheritTraceContext = false; + + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + await commitAndOpenNew(); + await testHelper.selectEventQueueAndExpectDone(tx, { expectedLength: 1 }); + expect(loggerMock).sendFioriActionCalled(); + expect(loggerMock.callsLengths().error).toEqual(0); + expect(jest.spyOn(otel.propagation, "extract")).toHaveBeenCalledTimes(0); + }); + }); }); const commitAndOpenNew = async () => { diff --git a/test/mocks/openTelemetry.js b/test/mocks/openTelemetry.js new file mode 100644 index 00000000..8803184c --- /dev/null +++ b/test/mocks/openTelemetry.js @@ -0,0 +1,38 @@ +"use strict"; + +const tracerMock = { + startSpan: jest.fn(() => { + return { + setStatus: jest.fn(), + recordException: jest.fn(), + setAttribute: jest.fn(), + spanContext: jest.fn(), + }; + }), +}; + +module.exports = { + SpanKind: {}, + SpanStatusCode: {}, + context: { + active: jest.fn(() => "mocked-context"), + with: jest.fn((context, fn) => { + return fn(); + }), + }, + propagation: { + inject: jest.fn((context, carrier) => { + carrier["mocked-trace"] = "trace-value"; + }), + extract: jest.fn(() => { + return {}; + }), + }, + trace: { + setSpan: jest.fn(), + getTracer: jest.fn(() => tracerMock), + getTracerProvider: jest.fn(() => { + return {}; + }), + }, +};