Skip to content

Commit 1a520e6

Browse files
everpcpctrim21
andauthored
feat: timeline api (#860)
Co-authored-by: Trim21 <[email protected]>
1 parent 0a39b65 commit 1a520e6

32 files changed

+3405
-34
lines changed

bin/mq.ts

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import { KafkaJS } from '@confluentinc/kafka-javascript';
2+
3+
import { handle as handleTimelineEvent } from '@app/event/timeline';
4+
import type { Payload } from '@app/event/type';
5+
import config from '@app/lib/config.ts';
6+
import { logger } from '@app/lib/logger';
7+
8+
const TOPICS = [
9+
// 'debezium.chii.bangumi.chii_characters',
10+
// 'debezium.chii.bangumi.chii_members',
11+
// 'debezium.chii.bangumi.chii_persons',
12+
// 'debezium.chii.bangumi.chii_pms',
13+
// 'debezium.chii.bangumi.chii_subject_fields',
14+
// 'debezium.chii.bangumi.chii_subjects',
15+
'debezium.chii.bangumi.chii_timeline',
16+
];
17+
18+
async function onMessage(key: string, value: string) {
19+
const payload = JSON.parse(value) as Payload;
20+
switch (payload.source.table) {
21+
case 'chii_timeline': {
22+
await handleTimelineEvent(key, value);
23+
break;
24+
}
25+
default: {
26+
break;
27+
}
28+
}
29+
}
30+
31+
async function main() {
32+
if (!config.kafkaBrokers) {
33+
logger.error('KAFKA_BROKERS is not set');
34+
return;
35+
}
36+
const { Kafka, logLevel } = KafkaJS;
37+
38+
const kafka = new Kafka({
39+
log_level: logLevel.WARN,
40+
'client.id': 'server-private',
41+
});
42+
const consumer = kafka.consumer({
43+
'bootstrap.servers': config.kafkaBrokers,
44+
'group.id': 'server-private',
45+
});
46+
await consumer.connect();
47+
await consumer.subscribe({ topics: TOPICS });
48+
49+
await consumer.run({
50+
eachMessage: async ({ message }) => {
51+
if (!message.key) {
52+
return;
53+
}
54+
if (!message.value) {
55+
return;
56+
}
57+
await onMessage(message.key.toString(), message.value.toString());
58+
},
59+
});
60+
}
61+
62+
await main();

build.mjs

+6
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,9 @@ await esbuild.build({
4040
outfile: 'dist/cron.mjs',
4141
...buildConfigs,
4242
});
43+
44+
await esbuild.build({
45+
entryPoints: ['bin/mq.ts'],
46+
outfile: 'dist/mq.mjs',
47+
...buildConfigs,
48+
});

docs/timeline.md

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# 关于时间线的缓存策略
2+
3+
## 目前时间线请求分两种
4+
5+
### 用户时间胶囊
6+
7+
缓存 tml_id 到 redis `tml:user:{uid}` 为 sorted set, 以 tml_dateline 为 score,整个 key 的过期时间为 7 天
8+
9+
1. 收到请求 /p1/users/{username}/timeline, 设置访问 key `tml:visit:user:${uid}`, 过期时间 7 天
10+
2. 检查缓存 `tml:user:{uid}` 里的数据量,如果能覆盖当前的 offset 请求,则直接 range 返回缓存里的 tml_id, 否则请求数据库
11+
3. 从 redis mget 上一步拿到的 tml_id 列表,missing 的部分请求数据库
12+
13+
### 首页时间线
14+
15+
缓存 tml_id 到 redis `tml:inbox:{uid}` 为 sorted set, 以 tml_dateline 为 score,整个 key 的过期时间为 7 天
16+
17+
1. 收到请求 /p1/timeline, 设置访问 key `tml:visit:inbox:{uid}`, 过期时间 7 天
18+
2. 检查缓存 `tml:inbox:{uid}` 里的数据量,如果能覆盖当前的 offset 请求,则直接 range 返回缓存里的 tml_id, 否则请求数据库
19+
3. 从 redis mget 上一步拿到的 tml_id 列表,missing 的部分请求数据库
20+
21+
## 关于 MQ 里对缓存的更新
22+
23+
MQ 从 kafka 消费 debezium 的 binlog,然后更新缓存
24+
25+
### create
26+
27+
- 检查 `tml:visit:user:${tml_uid}` 是否存在,更新 `tml:user:{tml_uid}`,zadd 新的 tml_id+tml_dateline
28+
- 设置 `tml:user:{tml_uid}` 的过期时间与 `tml:visit:user:${tml_uid}` 一致
29+
- 获取 tml_uid 的好友,检查对应每个人的 `tml:visit:inbox:{follower_uid}` 是否存在,更新 `tml:inbox:{follower_uid}`,zadd 新的 tml_id+tml_dateline
30+
- 设置每个 `tml:inbox:{follower_uid}` 的过期时间与 `tml:visit:inbox:{follower_uid}` 一致
31+
32+
### delete
33+
34+
- 清除 `tml:item:{tml_id}` 的缓存
35+
- zrem `tml:user:{uid}` 里相应的 tml_id
36+
- 获取 tml_uid 的好友,zrem 所有 `tml:inbox:{follower_uid}` 里相应的 tml_id
37+
38+
### update
39+
40+
清除 `tml:item:{tml_id}` 的缓存,下次请求的时候会重新请求数据库并回填 cache

drizzle/orm.ts

+3
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,6 @@ export type IPersonCollect = typeof schema.chiiPersonCollects.$inferSelect;
2828

2929
export type IIndex = typeof schema.chiiIndexes.$inferSelect;
3030
export type IIndexCollect = typeof schema.chiiIndexCollects.$inferSelect;
31+
32+
export type ITimeline = typeof schema.chiiTimeline.$inferSelect;
33+
export type ITimelineComment = typeof schema.chiiTimelineComments.$inferSelect;

drizzle/schema.ts

+24-24
Original file line numberDiff line numberDiff line change
@@ -1137,43 +1137,43 @@ export const chiiTagFields = mysqlTable('chii_tag_neue_fields', {
11371137
export const chiiTimeline = mysqlTable(
11381138
'chii_timeline',
11391139
{
1140-
tmlId: int('tml_id').autoincrement().notNull(),
1141-
tmlUid: mediumint('tml_uid').notNull(),
1142-
tmlCat: smallint('tml_cat').notNull(),
1143-
tmlType: smallint('tml_type').notNull(),
1144-
tmlRelated: char('tml_related', { length: 255 }).default('0').notNull(),
1145-
tmlMemo: mediumtext('tml_memo').notNull(),
1146-
tmlImg: mediumtext('tml_img').notNull(),
1147-
tmlBatch: tinyint('tml_batch').notNull(),
1148-
tmlSource: tinyint('tml_source').default(0).notNull(),
1149-
tmlReplies: mediumint('tml_replies').notNull(),
1150-
tmlDateline: int('tml_dateline').default(0).notNull(),
1140+
id: int('tml_id').autoincrement().notNull(),
1141+
uid: mediumint('tml_uid').notNull(),
1142+
cat: smallint('tml_cat').notNull(),
1143+
type: smallint('tml_type').notNull(),
1144+
related: char('tml_related', { length: 255 }).default('0').notNull(),
1145+
memo: mediumtext('tml_memo').notNull(),
1146+
img: mediumtext('tml_img').notNull(),
1147+
batch: customBoolean('tml_batch').notNull(),
1148+
source: tinyint('tml_source').default(0).notNull(),
1149+
replies: mediumint('tml_replies').notNull(),
1150+
createdAt: int('tml_dateline').default(0).notNull(),
11511151
},
11521152
(table) => {
11531153
return {
1154-
tmlUid: index('tml_uid').on(table.tmlUid),
1155-
tmlCat: index('tml_cat').on(table.tmlCat),
1156-
tmlBatch: index('tml_batch').on(table.tmlBatch),
1157-
queryTmlCat: index('query_tml_cat').on(table.tmlUid, table.tmlCat),
1154+
tmlUid: index('tml_uid').on(table.uid),
1155+
tmlCat: index('tml_cat').on(table.cat),
1156+
tmlBatch: index('tml_batch').on(table.batch),
1157+
queryTmlCat: index('query_tml_cat').on(table.uid, table.cat),
11581158
};
11591159
},
11601160
);
11611161

11621162
export const chiiTimelineComments = mysqlTable(
11631163
'chii_timeline_comments',
11641164
{
1165-
tmlPstId: mediumint('tml_pst_id').autoincrement().notNull(),
1166-
tmlPstMid: int('tml_pst_mid').notNull(),
1167-
tmlPstUid: mediumint('tml_pst_uid').notNull(),
1168-
tmlPstRelated: mediumint('tml_pst_related').notNull(),
1169-
tmlPstDateline: int('tml_pst_dateline').notNull(),
1170-
tmlPstContent: mediumtext('tml_pst_content').notNull(),
1165+
id: mediumint('tml_pst_id').autoincrement().notNull(),
1166+
mid: int('tml_pst_mid').notNull(),
1167+
uid: mediumint('tml_pst_uid').notNull(),
1168+
related: mediumint('tml_pst_related').notNull(),
1169+
createdAt: int('tml_pst_dateline').notNull(),
1170+
content: mediumtext('tml_pst_content').notNull(),
11711171
},
11721172
(table) => {
11731173
return {
1174-
cmtTmlId: index('cmt_tml_id').on(table.tmlPstMid),
1175-
tmlPstRelated: index('tml_pst_related').on(table.tmlPstRelated),
1176-
tmlPstUid: index('tml_pst_uid').on(table.tmlPstUid),
1174+
cmtTmlId: index('cmt_tml_id').on(table.mid),
1175+
tmlPstRelated: index('tml_pst_related').on(table.related),
1176+
tmlPstUid: index('tml_pst_uid').on(table.uid),
11771177
};
11781178
},
11791179
);

event/timeline.ts

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import { DateTime } from 'luxon';
2+
3+
import { logger } from '@app/lib/logger';
4+
import redis from '@app/lib/redis.ts';
5+
import {
6+
getInboxCacheKey,
7+
getInboxVisitCacheKey,
8+
getItemCacheKey,
9+
getUserCacheKey,
10+
getUserVisitCacheKey,
11+
} from '@app/lib/timeline/cache';
12+
import * as fetcher from '@app/lib/types/fetcher.ts';
13+
14+
import { EventOp } from './type';
15+
16+
interface Key {
17+
tml_id: number;
18+
}
19+
20+
interface TimelineItem {
21+
tml_id: number;
22+
tml_uid: number;
23+
tml_dateline: number;
24+
}
25+
26+
interface Payload {
27+
op: EventOp;
28+
before: TimelineItem | null;
29+
after: TimelineItem | null;
30+
}
31+
32+
export async function handle(key: string, value: string) {
33+
const idx = JSON.parse(key) as Key;
34+
const payload = JSON.parse(value) as Payload;
35+
36+
switch (payload.op) {
37+
case EventOp.Create: {
38+
if (!payload.after) {
39+
logger.error('invalid timeline payload for create', payload);
40+
return;
41+
}
42+
const tml = payload.after;
43+
logger.info(`process timeline create event: ${tml.tml_id}`);
44+
const now = DateTime.now().toUnixInteger();
45+
const ttlUser = Number(await redis.get(getUserVisitCacheKey(tml.tml_uid)));
46+
if (ttlUser > 0) {
47+
const userCacheKey = getUserCacheKey(tml.tml_uid);
48+
await redis.zadd(userCacheKey, payload.after.tml_dateline, tml.tml_id);
49+
// 将 cache key 的过期时间设置为与 visit key 一致
50+
await redis.expire(userCacheKey, ttlUser - now);
51+
}
52+
const friendIDs = await fetcher.fetchFriendIDsByUserID(tml.tml_uid);
53+
if (friendIDs.length > 0) {
54+
const ttlInbox = await redis.mget(friendIDs.map((fid) => getInboxVisitCacheKey(fid)));
55+
for (const [idx, fid] of friendIDs.entries()) {
56+
const ttl = Number(ttlInbox[idx]);
57+
if (ttl > 0) {
58+
const inboxCacheKey = getInboxCacheKey(fid);
59+
await redis.zadd(inboxCacheKey, payload.after.tml_dateline, tml.tml_id);
60+
await redis.expire(inboxCacheKey, ttl - now);
61+
}
62+
}
63+
}
64+
break;
65+
}
66+
case EventOp.Delete: {
67+
if (!payload.before) {
68+
logger.error('invalid timeline payload for delete', payload);
69+
return;
70+
}
71+
const tml = payload.before;
72+
logger.info(`process timeline delete event: ${tml.tml_id}`);
73+
await redis.zrem(getUserCacheKey(tml.tml_uid), tml.tml_id);
74+
const friendIDs = await fetcher.fetchFriendIDsByUserID(tml.tml_uid);
75+
if (friendIDs.length > 0) {
76+
for (const fid of friendIDs) {
77+
await redis.zrem(getInboxCacheKey(fid), tml.tml_id);
78+
}
79+
}
80+
await redis.del(getItemCacheKey(idx.tml_id));
81+
break;
82+
}
83+
case EventOp.Update: {
84+
logger.info(`process timeline update event: ${idx.tml_id}`);
85+
await redis.del(getItemCacheKey(idx.tml_id));
86+
break;
87+
}
88+
case EventOp.Snapshot: {
89+
// do nothing
90+
break;
91+
}
92+
}
93+
}

event/type.ts

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
export enum EventOp {
2+
Create = 'c',
3+
Delete = 'd',
4+
Update = 'u',
5+
Snapshot = 'r',
6+
}
7+
8+
export interface Payload {
9+
// before: object;
10+
// after: object;
11+
source: {
12+
table: string;
13+
};
14+
op: EventOp;
15+
}

lib/config.ts

+2
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ export const schema = Obj({
101101

102102
redisUri: t.String({ default: 'redis://127.0.0.1:3306/0', env: 'REDIS_URI' }),
103103

104+
kafkaBrokers: t.String({ default: '127.0.0.1:9092', env: 'KAFKA_BROKERS' }),
105+
104106
image: Obj({
105107
gatewayDomain: t.String({ default: 'lain.bgm.tv' }),
106108
provider: t.Enum(Image, { default: Image.FileSystem, env: 'CHII_IMAGE_PROVIDER' }),

lib/like.ts

+4
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,14 @@ export interface Reaction {
1111

1212
export const LikeType = {
1313
subject_cover: 1,
14+
1415
group_topic: 7,
1516
group_reply: 8,
17+
1618
subject_reply: 10,
1719
ep_reply: 11,
20+
21+
subject_collect: 40,
1822
} as const;
1923

2024
export const LIKE_REACTIONS_ALLOWED: ReadonlySet<number> = Object.freeze(

lib/openapi/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export const Tag = {
55
Group: 'group',
66
Person: 'person',
77
Subject: 'subject',
8+
Timeline: 'timeline',
89
Topic: 'topic',
910
User: 'user',
1011
Wiki: 'wiki',

lib/response.ts

+13-6
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,20 @@ export function avatar(s: string): res.IAvatar {
1818
};
1919
}
2020

21-
export function groupIcon(s: string): string {
22-
return `https://${imageDomain}/pic/icon/s/${s}`;
21+
export function groupIcon(s: string): res.IAvatar {
22+
if (!s) {
23+
s = 'icon.jpg';
24+
}
25+
return {
26+
large: `https://${imageDomain}/pic/icon/l/${s}`,
27+
medium: `https://${imageDomain}/pic/icon/m/${s}`,
28+
small: `https://${imageDomain}/pic/icon/s/${s}`,
29+
};
2330
}
2431

25-
export function subjectCover(s: string): res.ISubjectImages | null {
32+
export function subjectCover(s: string): res.ISubjectImages | undefined {
2633
if (!s) {
27-
return null;
34+
return undefined;
2835
}
2936
return {
3037
large: `${baseSubjectImageUrl}/l/${s}`,
@@ -35,9 +42,9 @@ export function subjectCover(s: string): res.ISubjectImages | null {
3542
};
3643
}
3744

38-
export function personImages(s: string): res.IPersonImages | null {
45+
export function personImages(s: string): res.IPersonImages | undefined {
3946
if (!s) {
40-
return null;
47+
return undefined;
4148
}
4249
return {
4350
large: `${basePersonImageUrl}/l/${s}`,

lib/timeline/cache.ts

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
export function getItemCacheKey(id: number): string {
2+
return `tml:item:${id}`;
3+
}
4+
5+
export function getUserCacheKey(uid: number) {
6+
return `tml:user:${uid}`;
7+
}
8+
9+
export function getUserVisitCacheKey(uid: number) {
10+
return `tml:visit:user:${uid}`;
11+
}
12+
13+
export function getInboxCacheKey(uid: number) {
14+
return `tml:inbox:${uid}`;
15+
}
16+
17+
export function getInboxVisitCacheKey(uid: number) {
18+
return `tml:visit:inbox:${uid}`;
19+
}

0 commit comments

Comments
 (0)