Skip to content

Commit 18b13eb

Browse files
committed
update user-defined function and rename file
1 parent c05e02f commit 18b13eb

File tree

2 files changed

+36
-18
lines changed

2 files changed

+36
-18
lines changed

broker/setup_broker/lvk/templates/ps_lvk_add_top_level_fields_smt.yaml

Lines changed: 0 additions & 18 deletions
This file was deleted.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# https://cloud.google.com/pubsub/docs/smts/create-topic-smt#create
2+
- javascriptUdf:
3+
code: >
4+
function addTopLevelFields(message, metadata) {
5+
const attrs = message.attributes || {};
6+
const dataStr = message.data.toString();
7+
8+
// Create an empty object to hold the new fields we want to inject into the JSON payload
9+
const newFields = {};
10+
11+
// Extract the following attributes and add them to newFields
12+
// We avoid casting fields as JavaScript numbers to prevent precision loss
13+
if (attrs["kafka.timestamp"]) {
14+
newFields.kafkaPublishTimestamp = attrs["kafka.timestamp"] * 1000;
15+
}
16+
17+
// Define the data as a set of key-value pairs to be added to the JSON payload
18+
const newPairs = Object.entries(newFields)
19+
.map(([k, v]) => `"${k}":${v}`);
20+
21+
if (newPairs.length === 0) {
22+
// No new fields; return the original message
23+
return message;
24+
}
25+
26+
// Inject the new fields into the JSON payload
27+
const newData = dataStr.endsWith("}")
28+
? dataStr.slice(0, -1) + "," + newPairs.join(",") + "}"
29+
: dataStr;
30+
31+
return {
32+
data: newData,
33+
attributes: attrs
34+
};
35+
}
36+
functionName: addTopLevelFields

0 commit comments

Comments
 (0)