Skip to content

Commit 173aa5b

Browse files
committed
create SMT to add the kafkaPublishTimestamp field to outgoing message
1 parent ce17c69 commit 173aa5b

File tree

1 file changed

+18
-0
lines changed

1 file changed

+18
-0
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# https://cloud.google.com/pubsub/docs/smts/create-topic-smt#create
2+
- javascriptUdf:
3+
code: >
4+
function addTopLevelFields(message, metadata) {
5+
const data = JSON.parse(message.data);
6+
const attrs = message.attributes || {};
7+
8+
const payload = {
9+
...data, // spread the original JSON fields into the root
10+
kafkaPublishTimestamp: attrs["kafka.timestamp"] ? Number(attrs["kafka.timestamp"]) * 1000 : null
11+
};
12+
13+
return {
14+
data: JSON.stringify(payload),
15+
attributes: attrs // preserve attributes
16+
};
17+
}
18+
functionName: addTopLevelFields

0 commit comments

Comments
 (0)