Skip to content

Commit 4ed7429

Browse files
committed
changefeeds -- make server more defensive and client less aggressive
1 parent 8b381bb commit 4ed7429

File tree

2 files changed

+32
-7
lines changed

2 files changed

+32
-7
lines changed

src/packages/nats/changefeed/server.ts

+28-1
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,13 @@ export const MIN_LIFETIME = 30 * 1000;
2020
export const MIN_HEARTBEAT = 5000;
2121
export const MAX_HEARTBEAT = 120000;
2222
export const MAX_CHANGEFEEDS_PER_ACCOUNT = parseInt(
23-
process.env.COCALC_MAX_CHANGEFEEDS_PER_ACCOUNT ?? "150",
23+
process.env.MAX_CHANGEFEEDS_PER_ACCOUNT ?? "100",
2424
);
25+
26+
export const MAX_CHANGEFEEDS_PER_SERVER = parseInt(
27+
process.env.MAX_CHANGEFEEDS_PER_SERVER ?? "5000",
28+
);
29+
2530
export const LAST_CHUNK = "last-chunk";
2631

2732
const logger = getLogger("changefeed:server");
@@ -52,6 +57,16 @@ let terminated = false;
5257
let sub: Subscription | null = null;
5358
export async function init(db) {
5459
logger.debug("starting changefeed server");
60+
logger.debug({
61+
DEFAULT_LIFETIME,
62+
MAX_LIFETIME,
63+
MIN_LIFETIME,
64+
MIN_HEARTBEAT,
65+
MAX_HEARTBEAT,
66+
MAX_CHANGEFEEDS_PER_ACCOUNT,
67+
MAX_CHANGEFEEDS_PER_SERVER,
68+
SUBJECT,
69+
});
5570
changefeedMainLoop(db);
5671
renewMainLoop();
5772
}
@@ -230,6 +245,18 @@ async function handleMessage(mesg, db) {
230245
);
231246
return;
232247
}
248+
if (numChangefeeds >= MAX_CHANGEFEEDS_PER_SERVER) {
249+
logger.debug("numChangefeeds >= MAX_CHANGEFEEDS_PER_SERVER", {
250+
numChangefeeds,
251+
MAX_CHANGEFEEDS_PER_SERVER,
252+
});
253+
// this will just cause the client to make another attempt, hopefully
254+
// to another server
255+
respond(
256+
`There is a limit of ${MAX_CHANGEFEEDS_PER_SERVER} changefeeds per server`,
257+
);
258+
return;
259+
}
233260

234261
let { heartbeat } = request;
235262
const lifetime = getLifetime(request);

src/packages/sync/table/synctable.ts

+4-6
Original file line numberDiff line numberDiff line change
@@ -721,7 +721,7 @@ export class SyncTable extends EventEmitter {
721721
}
722722

723723
private create_changefeed_connection = async (): Promise<any[]> => {
724-
let delay_ms: number = 500;
724+
let delay_ms: number = 3000;
725725
while (true) {
726726
this.close_changefeed();
727727
if (
@@ -757,13 +757,11 @@ export class SyncTable extends EventEmitter {
757757
}
758758
// This can happen because we might suddenly NOT be ready
759759
// to query db immediately after we are ready...
760-
console.warn(
761-
`WARNING: ${this.table} -- failed to create changefeed connection -- ${err}; will retry`,
760+
console.log(
761+
`WARNING: ${this.table} -- failed to create changefeed connection; will retry in ${delay_ms}ms -- ${err}`,
762762
);
763763
await delay(delay_ms);
764-
if (delay_ms < 7000) {
765-
delay_ms *= 1.3;
766-
}
764+
delay_ms = Math.min(20000, delay_ms * 1.25);
767765
}
768766
}
769767
};

0 commit comments

Comments
 (0)