Skip to content

Commit a634380

Browse files
authored
refactor: use object to pass kafka message (#1230)
1 parent ddbb30b commit a634380

File tree

11 files changed

+51
-36
lines changed

11 files changed

+51
-36
lines changed

bin/mq.ts

+20-12
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import {
1515
handleTopic as handleSubjectTopicEvent,
1616
} from '@app/event/subject';
1717
import { handle as handleTimelineEvent } from '@app/event/timeline';
18-
import type { Payload } from '@app/event/type';
18+
import type { KafkaMessage, Payload } from '@app/event/type';
1919
import { handle as handleUserEvent, handleFriend as handleFriendEvent } from '@app/event/user';
2020
import { newConsumer } from '@app/lib/kafka.ts';
2121
import { logger } from '@app/lib/logger';
@@ -43,7 +43,7 @@ const TOPICS = [
4343
'debezium.chii.bangumi.chii_subject_revisions',
4444
];
4545

46-
type Handler = (topic: string, key: string, value: string) => Promise<void>;
46+
type Handler = (msg: KafkaMessage) => Promise<void>;
4747

4848
const binlogHandlers: Record<string, Handler | Handler[]> = {
4949
chii_blog_entry: handleBlogEvent,
@@ -63,8 +63,8 @@ const binlogHandlers: Record<string, Handler | Handler[]> = {
6363
chii_subject_revisions: handleSubjectDate,
6464
};
6565

66-
async function onBinlogMessage(topic: string, key: string, value: string) {
67-
const payload = JSON.parse(value) as Payload;
66+
async function onBinlogMessage(msg: KafkaMessage) {
67+
const payload = JSON.parse(msg.value) as Payload;
6868
const handler = binlogHandlers[payload.source.table];
6969
if (!handler) {
7070
return;
@@ -74,14 +74,14 @@ async function onBinlogMessage(topic: string, key: string, value: string) {
7474
for (const h of handler) {
7575
// catch on each handler to it doesn't get swallowed by Promise.all.
7676
ts.push(
77-
h(topic, key, value).catch((error) => {
77+
h(msg).catch((error) => {
7878
logger.error('failed to handle event from %s: %o', payload.source.table, error);
7979
}),
8080
);
8181
}
8282
await Promise.all(ts);
8383
} else {
84-
await handler(topic, key, value).catch((error) => {
84+
await handler(msg).catch((error) => {
8585
logger.error('failed to handle event from %s: %o', payload.source.table, error);
8686
});
8787
}
@@ -91,13 +91,13 @@ const serviceHandlers: Record<string, Handler> = {
9191
timeline: handleTimelineMessage,
9292
};
9393

94-
async function onServiceMessage(topic: string, key: string, value: string) {
95-
const handler = serviceHandlers[topic];
94+
async function onServiceMessage(msg: KafkaMessage) {
95+
const handler = serviceHandlers[msg.topic];
9696
if (!handler) {
9797
return;
9898
}
99-
await handler(topic, key, value).catch((error) => {
100-
logger.error('failed to handle event from %s: %o', topic, error);
99+
await handler(msg).catch((error) => {
100+
logger.error('failed to handle event from %s: %o', msg.topic, error);
101101
});
102102
}
103103

@@ -119,9 +119,17 @@ async function main() {
119119
}
120120
try {
121121
if (topic.startsWith('debezium.')) {
122-
await onBinlogMessage(topic, message.key.toString(), message.value.toString());
122+
await onBinlogMessage({
123+
topic: topic,
124+
key: message.key.toString(),
125+
value: message.value.toString(),
126+
});
123127
} else {
124-
await onServiceMessage(topic, message.key.toString(), message.value.toString());
128+
await onServiceMessage({
129+
topic: topic,
130+
key: message.key.toString(),
131+
value: message.value.toString(),
132+
});
125133
}
126134
} catch (error) {
127135
logger.error(error, `error processing message ${message.key.toString()}`);

event/blog.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { getSlimCacheKey } from '@app/lib/blog/cache';
22
import redis from '@app/lib/redis.ts';
33

4-
import { EventOp } from './type';
4+
import { EventOp, type KafkaMessage } from './type';
55

66
interface BlogKey {
77
entry_id: number;
@@ -11,7 +11,7 @@ interface Payload {
1111
op: EventOp;
1212
}
1313

14-
export async function handle(topic: string, key: string, value: string) {
14+
export async function handle({ key, value }: KafkaMessage) {
1515
const idx = JSON.parse(key) as BlogKey;
1616
const payload = JSON.parse(value) as Payload;
1717
switch (payload.op) {

event/character.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { getSlimCacheKey } from '@app/lib/character/cache';
22
import redis from '@app/lib/redis.ts';
33

4-
import { EventOp } from './type';
4+
import { EventOp, type KafkaMessage } from './type';
55

66
interface CharacterKey {
77
crt_id: number;
@@ -11,7 +11,7 @@ interface Payload {
1111
op: EventOp;
1212
}
1313

14-
export async function handle(topic: string, key: string, value: string) {
14+
export async function handle({ key, value }: KafkaMessage) {
1515
const idx = JSON.parse(key) as CharacterKey;
1616
const payload = JSON.parse(value) as Payload;
1717
switch (payload.op) {

event/group.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { getSlimCacheKey, getTopicCacheKey } from '@app/lib/group/cache.ts';
22
import redis from '@app/lib/redis.ts';
33
import { getJoinedGroupsCacheKey } from '@app/lib/user/cache.ts';
44

5-
import { EventOp } from './type';
5+
import { EventOp, type KafkaMessage } from './type';
66

77
interface GroupKey {
88
grp_id: number;
@@ -12,7 +12,7 @@ interface GroupPayload {
1212
op: EventOp;
1313
}
1414

15-
export async function handle(topic: string, key: string, value: string) {
15+
export async function handle({ key, value }: KafkaMessage) {
1616
const idx = JSON.parse(key) as GroupKey;
1717
const payload = JSON.parse(value) as GroupPayload;
1818
switch (payload.op) {
@@ -42,7 +42,7 @@ interface GroupMemberPayload {
4242
};
4343
}
4444

45-
export async function handleMember(topic: string, _: string, value: string) {
45+
export async function handleMember({ value }: KafkaMessage) {
4646
const payload = JSON.parse(value) as GroupMemberPayload;
4747
const uid = payload.before?.gmb_uid ?? payload.after?.gmb_uid;
4848
if (!uid) {
@@ -70,7 +70,7 @@ interface GroupTopicPayload {
7070
op: EventOp;
7171
}
7272

73-
export async function handleTopic(topic: string, key: string, value: string) {
73+
export async function handleTopic({ key, value }: KafkaMessage) {
7474
const idx = JSON.parse(key) as GroupTopicKey;
7575
const payload = JSON.parse(value) as GroupTopicPayload;
7676
switch (payload.op) {

event/index.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { getSlimCacheKey } from '@app/lib/index/cache';
22
import redis from '@app/lib/redis.ts';
33

4-
import { EventOp } from './type';
4+
import { EventOp, type KafkaMessage } from './type';
55

66
interface IndexKey {
77
idx_id: number;
@@ -11,7 +11,7 @@ interface Payload {
1111
op: EventOp;
1212
}
1313

14-
export async function handle(topic: string, key: string, value: string) {
14+
export async function handle({ key, value }: KafkaMessage) {
1515
const idx = JSON.parse(key) as IndexKey;
1616
const payload = JSON.parse(value) as Payload;
1717
switch (payload.op) {

event/person.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { getSlimCacheKey } from '@app/lib/person/cache';
22
import redis from '@app/lib/redis.ts';
33

4-
import { EventOp } from './type';
4+
import { EventOp, type KafkaMessage } from './type';
55

66
interface PersonKey {
77
prsn_id: number;
@@ -11,7 +11,7 @@ interface Payload {
1111
op: EventOp;
1212
}
1313

14-
export async function handle(topic: string, key: string, value: string) {
14+
export async function handle({ key, value }: KafkaMessage) {
1515
const idx = JSON.parse(key) as PersonKey;
1616
const payload = JSON.parse(value) as Payload;
1717
switch (payload.op) {

event/subject.ts

+6-6
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
import { extractDate } from '@app/lib/subject/date';
1212
import { sleep } from '@app/lib/utils';
1313

14-
import { EventOp } from './type';
14+
import { EventOp, type KafkaMessage } from './type';
1515

1616
interface SubjectKey {
1717
subject_id: number;
@@ -24,7 +24,7 @@ interface Payload {
2424
};
2525
}
2626

27-
export async function handle(topic: string, key: string, value: string) {
27+
export async function handle({ key, value }: KafkaMessage) {
2828
const idx = JSON.parse(key) as SubjectKey;
2929
const payload = JSON.parse(value) as Payload;
3030
switch (payload.op) {
@@ -46,7 +46,7 @@ interface FieldsKey {
4646
field_sid: number;
4747
}
4848

49-
export async function handleFields(topic: string, key: string, value: string) {
49+
export async function handleFields({ key, value }: KafkaMessage) {
5050
const idx = JSON.parse(key) as FieldsKey;
5151
const payload = JSON.parse(value) as Payload;
5252
switch (payload.op) {
@@ -68,7 +68,7 @@ interface TopicKey {
6868
sbj_tpc_id: number;
6969
}
7070

71-
export async function handleTopic(topic: string, key: string, value: string) {
71+
export async function handleTopic({ key, value }: KafkaMessage) {
7272
const idx = JSON.parse(key) as TopicKey;
7373
const payload = JSON.parse(value) as Payload;
7474
switch (payload.op) {
@@ -90,7 +90,7 @@ interface EpisodeKey {
9090
ep_id: number;
9191
}
9292

93-
export async function handleEpisode(topic: string, key: string, value: string) {
93+
export async function handleEpisode({ key, value }: KafkaMessage) {
9494
const idx = JSON.parse(key) as EpisodeKey;
9595
const payload = JSON.parse(value) as Payload;
9696
switch (payload.op) {
@@ -112,7 +112,7 @@ interface SubjectRevKey {
112112
rev_subject_id: number;
113113
}
114114

115-
export async function handleSubjectDate(topic: string, key: string, value: string) {
115+
export async function handleSubjectDate({ topic, key, value }: KafkaMessage) {
116116
let subjectID: number;
117117
if (topic.endsWith('.chii_subject_revisions')) {
118118
const idx = JSON.parse(key) as SubjectRevKey;

event/timeline.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
} from '@app/lib/timeline/cache';
1212
import { fetchFollowers } from '@app/lib/user/utils';
1313

14-
import { EventOp } from './type';
14+
import { EventOp, type KafkaMessage } from './type';
1515

1616
interface Key {
1717
tml_id: number;
@@ -29,7 +29,7 @@ interface Payload {
2929
after: TimelineItem | null;
3030
}
3131

32-
export async function handle(topic: string, key: string, value: string) {
32+
export async function handle({ key, value }: KafkaMessage) {
3333
const idx = JSON.parse(key) as Key;
3434
const payload = JSON.parse(value) as Payload;
3535

event/type.ts

+6
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,9 @@ export interface Payload {
1515
};
1616
op: EventOp;
1717
}
18+
19+
export interface KafkaMessage {
20+
topic: string;
21+
key: string;
22+
value: string;
23+
}

event/user.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
getSlimCacheKey,
77
} from '@app/lib/user/cache';
88

9-
import { EventOp } from './type';
9+
import { EventOp, type KafkaMessage } from './type';
1010

1111
interface UserPayload {
1212
op: EventOp;
@@ -16,7 +16,7 @@ interface UserKey {
1616
uid: number;
1717
}
1818

19-
export async function handle(topic: string, key: string, value: string) {
19+
export async function handle({ key, value }: KafkaMessage) {
2020
const idx = JSON.parse(key) as UserKey;
2121
const payload = JSON.parse(value) as UserPayload;
2222
switch (payload.op) {
@@ -46,7 +46,7 @@ interface FriendPayload {
4646
};
4747
}
4848

49-
export async function handleFriend(topic: string, _: string, value: string) {
49+
export async function handleFriend({ value }: KafkaMessage) {
5050
const payload = JSON.parse(value) as FriendPayload;
5151
const uid = payload.before?.frd_uid ?? payload.after?.frd_uid;
5252
const fid = payload.before?.frd_fid ?? payload.after?.frd_fid;

lib/timeline/kafka.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type { KafkaMessage } from '@app/event/type';
12
import { logger } from '@app/lib/logger';
23

34
import { type TimelineMessage, TimelineWriter } from './writer';
@@ -7,7 +8,7 @@ interface Payload {
78
message: TimelineMessage[keyof TimelineMessage];
89
}
910

10-
export async function handleTimelineMessage(topic: string, key: string, value: string) {
11+
export async function handleTimelineMessage({ value }: KafkaMessage) {
1112
const payload = JSON.parse(value) as Payload;
1213

1314
switch (payload.op) {

0 commit comments

Comments
 (0)