generated from drivly/template.do
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathUpstashKafka.js
89 lines (78 loc) · 3.17 KB
/
UpstashKafka.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class UpstashKafka {
#baseUrl
#auth
constructor(baseUrl, username, password) {
this.#baseUrl = baseUrl
this.#auth = btoa(`${username}:${password}`)
}
async listQueues() {
const data = await this.kafkaService('topics')
return await this.kafkaService(
'offsets/latest',
Object.entries(data).flatMap(([topic, partitions]) => Array.from(Array(partitions).keys()).map((partition) => ({ topic, partition })))
).then((t) => t.map(({ topic, partition, offset }) => ({ queue: topic, partition, offset })))
}
async kafkaService(url, body) {
console.debug('kafka', url, JSON.stringify(body))
return await fetch(`https://${this.#baseUrl}/${url}`, {
headers: { Authorization: 'Basic ' + this.#auth, 'Content-Type': body ? 'application/json' : undefined },
method: body ? 'POST' : 'GET',
body: !body || typeof body === 'string' ? body : JSON.stringify(body),
}).then((response) => response.json())
}
}
function formatResponse(value) {
value = { queue: value.topic, ...value }
delete value.topic
return value
}
export class KafkaConsumer extends UpstashKafka {
queueName
group
instance
partition
constructor(baseUrl, username, password, queueName, group = 'GROUP_1', instance = 0) {
super(baseUrl, username, password)
this.queueName = queueName
this.group = group
this.instance = instance
}
async queue(queue = this.queueName, group = this.group, instance = this.instance) {
const messages = await this.kafkaService(`consume/${group}/${instance}/${queue}`).then((response) => formatResponse(response))
if (!this.partition) this.partition = messages?.[0]?.partition
return messages
}
async ack(offset, partition = this.partition, queue = this.queueName, group = this.group, instance = this.instance) {
return await this.kafkaService(`commit/${group}/${instance}`, { topic: queue, partition, offset }).then((response) => formatResponse(response))
}
async ackAll(group = this.group, instance = this.instance) {
return await this.kafkaService(`commit/${group}/${instance}`).then((response) => formatResponse(response))
}
async fetch(offset, queue = this.queueName, partition = this.partition) {
return await this.kafkaService('fetch', {
topic: queue,
partition,
offset,
}).then((response) => formatResponse(response))
}
}
export class KafkaProducer extends UpstashKafka {
queueName
constructor(baseUrl, username, password, queueName) {
super(baseUrl, username, password)
this.queueName = queueName
}
async send(message, queue = this.queueName) {
return await this.sendBatch(message, queue).then((r) => r?.[0] || r)
}
async sendBatch(messages, queue = this.queueName) {
return await this.kafkaService(
`produce/${queue}`,
typeof messages?.value === 'string' || typeof messages?.[0]?.value === 'string'
? messages
: Array.isArray(messages)
? messages.map((value) => ({ value: JSON.stringify(value) }))
: { value: JSON.stringify(messages) }
).then((response) => (Array.isArray(response) && Array.isArray(messages) ? response.map((r) => formatResponse(r)) : formatResponse(response?.[0] || response)))
}
}