-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathstreamToRedis.ts
74 lines (65 loc) · 3.37 KB
/
streamToRedis.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import { stream } from "../streams/listenOnStream.js"
import { redisClient } from "../utils/getRedisClient.js"
import { streamKeyHolder } from "../utils/KeyHolder.js"
import { setTimeout } from "node:timers/promises"
import pino from "pino"
import { streamPaths } from "../streams/streamPaths.js"
import { Transform } from "stream"
import { AnyEvent } from "../types/eventTypes"
import { Readable } from "node:stream"
/*
This file listens to the Companies House long polling streaming API, and when events are received, they are posted
to a Redis database PubSub channel called 'event:' followed by the path of the stream, eg 'event:filings'.
Streams reconnect when ended.
*/
streamKeyHolder.addKey(process.env.STREAM_KEY1)
streamKeyHolder.addKey(process.env.STREAM_KEY2)
const logger = pino()
const sendEvent = (streamPath: string) => (event: AnyEvent) => redisClient.xAdd("events:" + streamPath, event.event.timepoint + "-*", { "event": JSON.stringify(event) }, {
TRIM: {
strategy: "MAXLEN",
threshold: 10000,
strategyModifier: "~"
}
})
const incrEventCount = (streamPath: string) => (event: AnyEvent) => redisClient.hIncrBy(`counts:${streamPath}:daily`, new Date().toISOString().split("T")[0], 1)
const incrResourceKindCount = (streamPath: string) => (event: AnyEvent) => redisClient.hIncrBy(`resourceKinds:${streamPath}`, event.resource_kind, 1)
const updateTimepoint = (streamPath: string) => (event: AnyEvent) => redisClient.set("timepoints:" + streamPath, JSON.stringify(event.event), { EX: 86400 * 7 })
const heartbeat = (streamPath: string) => () => redisClient.hSet("heartbeats", streamPath, Date.now()) // keeps track of which are alive
const getMostRecentTimepoint = (streamPath: string) => redisClient.get("timepoints:" + streamPath).then(r => r ? JSON.parse(r)?.timepoint : undefined)
const startStream = (streamPath: string): Promise<Readable> => getMostRecentTimepoint(streamPath)
.then((timepoint) => stream(streamPath, timepoint)
.on("data", sendEvent(streamPath))
.on("data", updateTimepoint(streamPath))
.on("data", incrEventCount(streamPath))
.on("data", incrResourceKindCount(streamPath))
.on("end", () => logger.info({ streamPath }, "StreamToRedis end event fired"))
.on("close", () => logger.info({ streamPath }, "StreamToRedis close event fired"))
.on("error", () => logger.info({ streamPath }, "StreamToRedis error event fired"))
.on("heartbeat", heartbeat(streamPath))
.on("end", () => setTimeout(60000)
.then(() => logger.info({ streamPath }, "Restarting stream, after waiting 60 seconds since disconnected."))
.then(() => startStream(streamPath))))// restart on end
const streams = new Set<Readable>()
for (const streamPath of streamPaths) {
streams.add(await startStream(streamPath))
await setTimeout(5000) // space them out 5 seconds
}
async function shutdown() {
const requestTime = performance.now()
try {
logger.flush()
console.log("Graceful shutdown commenced", new Date())
for (const stream of streams) {
stream.destroy()
}
await redisClient.quit()
logger.flush()
} finally {
const waitingNs = performance.now() - requestTime
console.log("Graceful shutdown finished", new Date(), "in", waitingNs / 1000 / 1000, "ms")
process.exit()
}
}
process.on("SIGINT", shutdown) // quit on ctrl-c when running docker in terminal
process.on("SIGTERM", shutdown)// quit properly on docker stop