Skip to content

Commit be7a716

Browse files
authored
Merge pull request #145 from danthegoodman1/main
Support maxParallelism of 0 to enable pausing workpools
2 parents 2f7e6b1 + 3a679e9 commit be7a716

File tree

5 files changed

+44
-6
lines changed

5 files changed

+44
-6
lines changed

src/client/index.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,22 @@ export class Workpool {
256256
...options,
257257
});
258258
}
259+
260+
/**
261+
* Kicks the workpool to wake it up and sync config (e.g. maxParallelism).
262+
* Useful for resuming after pausing (setting maxParallelism to 0).
263+
*
264+
* @param ctx - The mutation or action context that can call ctx.runMutation.
265+
*/
266+
async kick(ctx: RunMutationCtx): Promise<void> {
267+
await ctx.runMutation(this.component.lib.kick, {
268+
config: {
269+
logLevel: this.options.logLevel ?? DEFAULT_LOG_LEVEL,
270+
maxParallelism: this.options.maxParallelism ?? DEFAULT_MAX_PARALLELISM,
271+
},
272+
});
273+
}
274+
259275
/**
260276
* Gets the status of a work item.
261277
*
@@ -360,7 +376,7 @@ export type RetryOption = {
360376

361377
export type WorkpoolOptions = {
362378
/** How many actions/mutations can be running at once within this pool.
363-
* Min 1, Suggested max: 100 on Pro, 20 on the free plan.
379+
* Min 0 (no new work starts), Suggested max: 100 on Pro, 20 on free plan.
364380
*/
365381
maxParallelism?: number;
366382
/** How much to log. This is updated on each call to `enqueue*`,

src/component/_generated/component.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,18 @@ export type ComponentApi<Name extends string | undefined = string | undefined> =
9393
Array<string>,
9494
Name
9595
>;
96+
kick: FunctionReference<
97+
"mutation",
98+
"internal",
99+
{
100+
config: {
101+
logLevel: "DEBUG" | "TRACE" | "INFO" | "REPORT" | "WARN" | "ERROR";
102+
maxParallelism: number;
103+
};
104+
},
105+
null,
106+
Name
107+
>;
96108
status: FunctionReference<
97109
"query",
98110
"internal",

src/component/kick.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ async function getOrUpdateGlobals(ctx: MutationCtx, config?: Partial<Config>) {
110110
} else if (config) {
111111
let updated = false;
112112
if (
113-
config.maxParallelism &&
113+
config.maxParallelism !== undefined &&
114114
config.maxParallelism !== globals.maxParallelism
115115
) {
116116
globals.maxParallelism = config.maxParallelism;

src/component/lib.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,11 @@ describe("lib", () => {
7878
fnType: "mutation",
7979
runAt: Date.now(),
8080
config: {
81-
maxParallelism: 0, // Less than minimum
81+
maxParallelism: -1, // Less than minimum
8282
logLevel: "WARN",
8383
},
8484
}),
85-
).rejects.toThrow("maxParallelism must be >= 1");
85+
).rejects.toThrow("maxParallelism must be >= 0");
8686
});
8787
});
8888

src/component/lib.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ function validateConfig(config: Config) {
8181
createLogger(config.logLevel).warn(
8282
`maxParallelism should be <= ${MAX_PARALLELISM_SOFT_LIMIT}, but is set to ${config.maxParallelism}. This will be an error in a future version.`,
8383
);
84-
} else if (config.maxParallelism < 1) {
85-
throw new Error("maxParallelism must be >= 1");
84+
} else if (config.maxParallelism < 0) {
85+
throw new Error("maxParallelism must be >= 0");
8686
}
8787
}
8888

@@ -163,6 +163,16 @@ export const cancelAll = mutation({
163163
},
164164
});
165165

166+
export const kick = mutation({
167+
args: { config },
168+
returns: v.null(),
169+
handler: async (ctx, { config }) => {
170+
validateConfig(config);
171+
await kickMainLoop(ctx, "kick", config);
172+
return null;
173+
},
174+
});
175+
166176
export const status = query({
167177
args: { id: v.id("work") },
168178
returns: statusValidator,

0 commit comments

Comments
 (0)