Skip to content

Commit 6eaa7fe

Browse files
Fork sse pubsub, typescript, logging (growthbook#15)
* fork sse-pubsub, typescript, logging, add license * increase version * cleanup * cleanup
1 parent 0c752c1 commit 6eaa7fe

11 files changed

+299
-62
lines changed

LICENSE

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
MIT License
2+
3+
Copyright (c) 2021 GrowthBook, Inc.
4+
Copyright (c) 2017 Andrew Betts
5+
6+
Permission is hereby granted, free of charge, to any person obtaining a copy
7+
of this software and associated documentation files (the "Software"), to deal
8+
in the Software without restriction, including without limitation the rights
9+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
copies of the Software, and to permit persons to whom the Software is
11+
furnished to do so, subject to the following conditions:
12+
13+
The above copyright notice and this permission notice shall be included in all
14+
copies or substantial portions of the Software.
15+
16+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
SOFTWARE.

package.json

+1-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"node": ">=16"
55
},
66
"description": "GrowthBook proxy server for caching, realtime updates, telemetry, etc",
7-
"version": "1.0.7",
7+
"version": "1.0.8",
88
"main": "dist/app.js",
99
"license": "MIT",
1010
"repository": {
@@ -30,7 +30,6 @@
3030
"pino-http": "^8.3.1",
3131
"redis": "^4.5.1",
3232
"spdy": "^4.0.2",
33-
"sse-pubsub": "^1.4.1",
3433
"uuid": "^9.0.0"
3534
},
3635
"devDependencies": {

src/app.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ import { featuresRouter } from "./controllers/featuresController";
88
import proxyMiddleware from "./middleware/proxyMiddleware";
99
import { featuresCache, initializeCache } from "./services/cache";
1010
import { initializeRegistrar, registrar } from "./services/registrar";
11-
import { eventStreamManager } from "./services/eventStreamManager";
11+
import {
12+
eventStreamManager,
13+
initializeEventStreamManager,
14+
} from "./services/eventStreamManager";
1215
import { Context, GrowthBookProxy } from "./types";
1316
import logger, { initializeLogger } from "./services/logger";
1417

@@ -70,6 +73,7 @@ export const growthBookProxy = async (
7073
initializeLogger(ctx);
7174
await initializeRegistrar(ctx);
7275
ctx.enableCache && (await initializeCache(ctx));
76+
ctx.enableEventStream && initializeEventStreamManager(ctx);
7377

7478
// set up handlers
7579
ctx.enableCors && app.use(cors());

src/controllers/eventStreamController.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import { apiKeyMiddleware } from "../middleware/apiKeyMiddleware";
44
import { validateEventStreamChannelMiddleware } from "../middleware/eventStream/validateEventStreamChannelMiddleware";
55

66
const getSubscribeToEventStream = async (req: Request, res: Response) => {
7-
eventStreamManager.subscribe(req, res);
7+
if (eventStreamManager) {
8+
eventStreamManager.subscribe(req, res);
9+
}
810
};
911

1012
export const eventStreamRouter = express.Router();

src/middleware/cache/readThroughCacheMiddleware.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ const interceptor = (proxyTarget: string) =>
2424
errorCounts[proxyTarget] = 0;
2525

2626
const response = responseBuffer.toString("utf-8");
27-
if (!featuresCache) {
27+
if (!featuresCache || !eventStreamManager) {
2828
return response;
2929
}
3030

src/middleware/cache/refreshStaleCacheMiddleware.ts

+11-1
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ import { Request, Response } from "express";
33
import { featuresCache } from "../../services/cache";
44
import logger from "../../services/logger";
55
import { eventStreamManager } from "../../services/eventStreamManager";
6+
import { Context } from "../../types";
67

78
const activeFetchUrls = new Set<string>();
89

910
export default ({ proxyTarget }: { proxyTarget: string }) =>
1011
async (req: Request, res: Response) => {
12+
const ctx = req.app.locals?.ctx as Context;
13+
1114
if (!featuresCache) {
1215
return;
1316
}
@@ -32,7 +35,14 @@ export default ({ proxyTarget }: { proxyTarget: string }) =>
3235
const oldEntry = await featuresCache.get(apiKey);
3336
await featuresCache.set(apiKey, entry);
3437

35-
eventStreamManager.publish(apiKey, "features", entry, oldEntry?.payload);
38+
if (ctx.enableEventStream && eventStreamManager) {
39+
eventStreamManager.publish(
40+
apiKey,
41+
"features",
42+
entry,
43+
oldEntry?.payload
44+
);
45+
}
3646
} else {
3747
logger.error("Unable to parse response");
3848
}

src/middleware/eventStream/broadcastEventStreamMiddleware.ts

+2-8
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,14 @@ export const broadcastEventStreamMiddleware = async (
1212
const ctx = req.app.locals?.ctx as Context;
1313
ctx?.verboseDebugging && logger.info("broadcastEventStreamMiddleware");
1414

15-
if (ctx?.enableEventStream) {
15+
if (ctx?.enableEventStream && eventStreamManager) {
1616
const apiKey = res.locals.apiKey;
1717

1818
const oldEntry = featuresCache
1919
? await featuresCache.get(apiKey)
2020
: undefined;
2121

22-
eventStreamManager.publish(
23-
apiKey,
24-
"features",
25-
req.body,
26-
oldEntry?.payload,
27-
ctx
28-
);
22+
eventStreamManager.publish(apiKey, "features", req.body, oldEntry?.payload);
2923
}
3024
next();
3125
};

src/services/cache/RedisCache.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ export class RedisCache {
169169
if (uuid === this.clientUUID) return;
170170

171171
// 1. emit SSE to SDK clients (if new payload !== old payload)
172-
if (this.appContext?.enableEventStream) {
172+
if (this.appContext?.enableEventStream && eventStreamManager) {
173173
const oldEntry = await this.get(key);
174174
eventStreamManager.publish(
175175
key,
+25-42
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,9 @@
11
import { Request, Response } from "express";
22
import logger from "../logger";
33
import { Context } from "../../types";
4-
const SSEChannel = require("sse-pubsub");
4+
import { SSEChannel, Options } from "./ssePubsub";
55

6-
// START hacky TS binding for sse-pubsub
7-
interface SSEChannel {
8-
/* eslint-disable @typescript-eslint/no-explicit-any */
9-
constructor: (options: SSEChannelOptions) => void;
10-
/* eslint-disable @typescript-eslint/no-explicit-any */
11-
publish: (data: any, eventName: string) => number;
12-
subscribe: (
13-
req: Request,
14-
res: Response,
15-
/* eslint-disable @typescript-eslint/no-explicit-any */
16-
events?: any[]
17-
/* eslint-disable @typescript-eslint/no-explicit-any */
18-
) => { req: Request; res: Response; events?: any[] };
19-
/* eslint-disable @typescript-eslint/no-explicit-any */
20-
unsubscribe: (c: { req: Request; res: Response; events?: any[] }) => void;
21-
close: () => void;
22-
listClients: () => { [ip: string]: number };
23-
getSubscriberCount: () => number;
24-
}
25-
interface SSEChannelOptions {
26-
pingInterval?: number; // default 3000
27-
maxStreamDuration?: number; // default 30000
28-
clientRetryInterval?: number; // default 1000
29-
startId?: number; // default 1
30-
historySize?: number; // default 100
31-
rewind?: number; // default 0
32-
}
33-
// END hacky TS binding for sse-pubsub
34-
35-
const defaultOptions: SSEChannelOptions = {
6+
const defaultOptions: Partial<Options> = {
367
historySize: 1,
378
};
389

@@ -41,9 +12,15 @@ interface ScopedChannel {
4112
channel: SSEChannel;
4213
}
4314

44-
export class EventStreamManager {
15+
export class SSEManager {
4516
private scopedChannels = new Map<string, ScopedChannel>();
4617

18+
private appContext: Context;
19+
20+
constructor(appContext: Context) {
21+
this.appContext = appContext;
22+
}
23+
4724
public subscribe(req: Request, res: Response) {
4825
const apiKey = res.locals.apiKey;
4926
if (apiKey) {
@@ -92,49 +69,55 @@ export class EventStreamManager {
9269
/* eslint-disable @typescript-eslint/no-explicit-any */
9370
payload: any,
9471
/* eslint-disable @typescript-eslint/no-explicit-any */
95-
oldPayload?: any,
96-
ctx?: Context
72+
oldPayload?: any
9773
) {
98-
ctx?.verboseDebugging &&
74+
this.appContext?.verboseDebugging &&
9975
logger.info(
10076
{ apiKey, event, payload, oldPayload },
10177
"EventStreamManager.publish"
10278
);
10379
const scopedChannel = this.getScopedChannel(apiKey);
10480
if (scopedChannel) {
10581
if (oldPayload === undefined) {
106-
ctx?.verboseDebugging &&
82+
this.appContext?.verboseDebugging &&
10783
logger.info({ payload, event }, "publishing SSE");
10884
scopedChannel.channel.publish(payload, event);
10985
} else {
11086
const hasChanges =
11187
JSON.stringify(payload) !== JSON.stringify(oldPayload);
11288
if (hasChanges) {
113-
ctx?.verboseDebugging &&
89+
this.appContext?.verboseDebugging &&
11490
logger.info({ payload, event }, "publishing SSE");
11591
scopedChannel.channel.publish(payload, event);
11692
return;
11793
}
118-
ctx?.verboseDebugging &&
94+
this.appContext?.verboseDebugging &&
11995
logger.info({ payload, event }, "skipping SSE publish, no changes");
12096
}
12197
return;
12298
}
123-
ctx?.verboseDebugging && logger.info("No scoped channel found");
99+
this.appContext?.verboseDebugging && logger.info("No scoped channel found");
124100
}
125101

126102
private getScopedChannel(apiKey: string): ScopedChannel | undefined {
127103
let scopedChannel = this.scopedChannels.get(apiKey);
128104
if (!scopedChannel) {
129105
this.scopedChannels.set(apiKey, {
130106
apiKey,
131-
channel: new SSEChannel(defaultOptions),
107+
channel: new SSEChannel(defaultOptions, this.appContext),
132108
});
133109
scopedChannel = this.scopedChannels.get(apiKey);
134110
}
135111
return scopedChannel;
136112
}
137113
}
138114

139-
export const eventStreamManager = new EventStreamManager();
140-
Object.freeze(eventStreamManager);
115+
export type EventStreamManager = SSEManager | null;
116+
export let eventStreamManager: SSEManager | null = null;
117+
118+
export const initializeEventStreamManager = (appContext: Context) => {
119+
if (appContext.enableEventStream) {
120+
eventStreamManager = new SSEManager(appContext);
121+
}
122+
Object.freeze(eventStreamManager);
123+
};

0 commit comments

Comments
 (0)