-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.ts
357 lines (340 loc) · 11.3 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
import {
createFunctionHandle,
DefaultFunctionArgs,
FunctionHandle,
FunctionReference,
FunctionType,
FunctionVisibility,
getFunctionName,
} from "convex/server";
import { v, VString } from "convex/values";
import { Mounts } from "../component/_generated/api.js";
import { DEFAULT_LOG_LEVEL, type LogLevel } from "../component/logging.js";
import {
Config,
DEFAULT_MAX_PARALLELISM,
OnComplete,
runResult as resultValidator,
type RetryBehavior,
RunResult,
OnCompleteArgs as SharedOnCompleteArgs,
Status,
} from "../component/shared.js";
import { RunMutationCtx, RunQueryCtx, UseApi } from "./utils.js";
export { resultValidator, type RunResult, type RetryBehavior, type OnComplete };
export {
retryBehavior as vRetryBehavior,
onComplete as vOnComplete,
} from "../component/shared.js";
export { logLevel as vLogLevel, type LogLevel } from "../component/logging.js";
export { resultValidator as vResultValidator };
// Attempts will run with delay [0, 250, 500, 1000, 2000] (ms)
export const DEFAULT_RETRY_BEHAVIOR: RetryBehavior = {
maxAttempts: 5,
initialBackoffMs: 250,
base: 2,
};
export type WorkId = string & { __isWorkId: true };
export const workIdValidator = v.string() as VString<WorkId>;
export { workIdValidator as vWorkIdValidator };
export type NameOption = {
/**
* The name of the function. By default, if you pass in api.foo.bar.baz,
* it will use "foo/bar:baz" as the name. If you pass in a function handle,
* it will use the function handle directly.
*/
name?: string;
};
export type RetryOption = {
/** Whether to retry the action if it fails.
* If true, it will use the default retry behavior.
* If custom behavior is provided, it will retry using that behavior.
* If unset, it will use the Workpool's configured default.
*/
retry?: boolean | RetryBehavior;
};
export type WorkpoolOptions = {
/** How many actions/mutations can be running at once within this pool.
* Min 1, Max 300.
*/
maxParallelism?: number;
/** How much to log. This is updated on each call to `enqueue*`,
* `status`, or `cancel*`.
* Default is REPORT, which logs warnings, errors, and a periodic report.
* With INFO, you can also see events for started and completed work.
* Stats generated can be parsed by tools like
* [Axiom](https://axiom.co) for monitoring.
* With DEBUG, you can see timers and internal events for work being
* scheduled.
*/
logLevel?: LogLevel;
} & WorkpoolRetryOptions;
export type WorkpoolRetryOptions = {
/** Default retry behavior for enqueued actions.
* See {@link RetryBehavior}.
*/
defaultRetryBehavior?: RetryBehavior;
/** Whether to retry actions that fail by default. Default: false.
* NOTE: Only enable this if your actions are idempotent.
* See the docs (README.md) for more details.
*/
retryActionsByDefault?: boolean;
};
export class Workpool {
/**
* Initializes a Workpool.
*
* Note: if you want different pools, you need to *create different instances*
* of Workpool in convex.config.ts. It isn't sufficient to have different
* instances of this class.
*
* @param component - The component to use, like `components.workpool` from
* `./_generated/api.ts`.
* @param options - The {@link WorkpoolOptions} for the Workpool.
*/
constructor(
private component: UseApi<Mounts>, // UseApi<api> for jump to definition
public options: WorkpoolOptions
) {}
/**
* Enqueues an action to be run.
*
* @param ctx - The mutation or action context that can call ctx.runMutation.
* @param fn - The action to run, like `internal.example.myAction`.
* @param fnArgs - The arguments to pass to the action.
* @param options - The options for the action to specify retry behavior,
* onComplete handling, and scheduling via `runAt` or `runAfter`.
* @returns The ID of the work that was enqueued.
*/
async enqueueAction<Args extends DefaultFunctionArgs, ReturnType>(
ctx: RunMutationCtx,
fn: FunctionReference<"action", FunctionVisibility, Args, ReturnType>,
fnArgs: Args,
options?: RetryOption & CallbackOptions & SchedulerOptions & NameOption
): Promise<WorkId> {
const retryBehavior = getRetryBehavior(
this.options.defaultRetryBehavior,
this.options.retryActionsByDefault,
options?.retry
);
const onComplete: OnComplete | undefined = options?.onComplete
? {
fnHandle: await createFunctionHandle(options.onComplete),
context: options.context,
}
: undefined;
const id = await ctx.runMutation(this.component.lib.enqueue, {
...(await defaultEnqueueArgs(fn, options?.name, this.options)),
fnArgs,
fnType: "action",
runAt: getRunAt(options),
onComplete,
retryBehavior,
});
return id as WorkId;
}
/**
* Enqueues a mutation to be run.
*
* Note: mutations are not retried by the workpool. Convex automatically
* retries them on database conflicts and transient failures.
* Because they're deterministic, external retries don't provide any benefit.
*
* @param ctx - The mutation or action context that can call ctx.runMutation.
* @param fn - The mutation to run, like `internal.example.myMutation`.
* @param fnArgs - The arguments to pass to the mutation.
* @param options - The options for the mutation to specify onComplete handling
* and scheduling via `runAt` or `runAfter`.
*/
async enqueueMutation<Args extends DefaultFunctionArgs, ReturnType>(
ctx: RunMutationCtx,
fn: FunctionReference<"mutation", FunctionVisibility, Args, ReturnType>,
fnArgs: Args,
options?: CallbackOptions & SchedulerOptions & NameOption
): Promise<WorkId> {
const onComplete: OnComplete | undefined = options?.onComplete
? {
fnHandle: await createFunctionHandle(options.onComplete),
context: options.context,
}
: undefined;
const id = await ctx.runMutation(this.component.lib.enqueue, {
...(await defaultEnqueueArgs(fn, options?.name, this.options)),
fnArgs,
fnType: "mutation",
runAt: getRunAt(options),
onComplete,
});
return id as WorkId;
}
async enqueueQuery<Args extends DefaultFunctionArgs, ReturnType>(
ctx: RunMutationCtx,
fn: FunctionReference<"query", FunctionVisibility, Args, ReturnType>,
fnArgs: Args,
options?: CallbackOptions & SchedulerOptions & NameOption
): Promise<WorkId> {
const onComplete: OnComplete | undefined = options?.onComplete
? {
fnHandle: await createFunctionHandle(options.onComplete),
context: options.context,
}
: undefined;
const id = await ctx.runMutation(this.component.lib.enqueue, {
...(await defaultEnqueueArgs(fn, options?.name, this.options)),
fnArgs,
fnType: "query",
runAt: getRunAt(options),
onComplete,
});
return id as WorkId;
}
/**
* Cancels a work item. If it's already started, it will be allowed to finish
* but will not be retried.
*
* @param ctx - The mutation or action context that can call ctx.runMutation.
* @param id - The ID of the work to cancel.
*/
async cancel(ctx: RunMutationCtx, id: WorkId): Promise<void> {
await ctx.runMutation(this.component.lib.cancel, {
id,
logLevel: this.options.logLevel ?? DEFAULT_LOG_LEVEL,
});
}
/**
* Cancels all pending work items. See {@link cancel}.
*
* @param ctx - The mutation or action context that can call ctx.runMutation.
*/
async cancelAll(ctx: RunMutationCtx): Promise<void> {
await ctx.runMutation(this.component.lib.cancelAll, {
logLevel: this.options.logLevel ?? DEFAULT_LOG_LEVEL,
});
}
/**
* Gets the status of a work item.
*
* @param ctx - The query context that can call ctx.runQuery.
* @param id - The ID of the work to get the status of.
* @returns The status of the work item. One of:
* - `{ state: "pending", previousAttempts: number }`
* - `{ state: "running", previousAttempts: number }`
* - `{ state: "finished" }`
*/
async status(ctx: RunQueryCtx, id: WorkId): Promise<Status> {
return ctx.runQuery(this.component.lib.status, { id });
}
}
export type SchedulerOptions =
| {
/**
* The time (ms since epoch) to run the action at.
* If not provided, the action will be run as soon as possible.
* Note: this is advisory only. It may run later.
*/
runAt?: number;
}
| {
/**
* The number of milliseconds to run the action after.
* If not provided, the action will be run as soon as possible.
* Note: this is advisory only. It may run later.
*/
runAfter?: number;
};
export type CallbackOptions = {
/**
* A mutation to run after the function succeeds, fails, or is canceled.
* The context type is for your use, feel free to provide a validator for it.
* e.g.
* ```ts
* export const completion = internalMutation({
* args: {
* workId: workIdValidator,
* context: v.any(),
* result: resultValidator,
* },
* handler: async (ctx, args) => {
* console.log(args.result, "Got Context back -> ", args.context, Date.now() - args.context);
* },
* });
* ```
*/
onComplete?: FunctionReference<
"mutation",
FunctionVisibility,
OnCompleteArgs
> | null;
/**
* A context object to pass to the `onComplete` mutation.
* Useful for passing data from the enqueue site to the onComplete site.
*/
context?: unknown;
};
export type OnCompleteArgs = {
/**
* The ID of the work that completed.
*/
workId: WorkId;
/**
* The context object passed when enqueuing the work.
* Useful for passing data from the enqueue site to the onComplete site.
*/
context: unknown;
/**
* The result of the run that completed.
*/
result: RunResult;
};
// ensure OnCompleteArgs satisfies SharedOnCompleteArgs
const _ = {} as OnCompleteArgs satisfies SharedOnCompleteArgs;
//
// Helper functions
//
function getRetryBehavior(
defaultRetryBehavior: RetryBehavior | undefined,
retryActionsByDefault: boolean | undefined,
retryOverride: boolean | RetryBehavior | undefined
): RetryBehavior | undefined {
const defaultRetry = defaultRetryBehavior ?? DEFAULT_RETRY_BEHAVIOR;
const retryByDefault = retryActionsByDefault ?? false;
if (retryOverride === true) {
return defaultRetry;
}
if (retryOverride === false) {
return undefined;
}
return retryOverride ?? (retryByDefault ? defaultRetry : undefined);
}
async function defaultEnqueueArgs(
fn:
| FunctionReference<FunctionType, FunctionVisibility>
| FunctionHandle<FunctionType, DefaultFunctionArgs>,
name: string | undefined,
{ logLevel, maxParallelism }: Partial<Config>
) {
const [fnHandle, fnName] =
typeof fn === "string" && fn.startsWith("function://")
? [fn, name ?? fn]
: [await createFunctionHandle(fn), name ?? getFunctionName(fn)];
return {
fnHandle,
fnName,
config: {
logLevel: logLevel ?? DEFAULT_LOG_LEVEL,
maxParallelism: maxParallelism ?? DEFAULT_MAX_PARALLELISM,
},
};
}
function getRunAt(options?: SchedulerOptions): number {
if (!options) {
return Date.now();
}
if ("runAt" in options && options.runAt !== undefined) {
return options.runAt;
}
if ("runAfter" in options && options.runAfter !== undefined) {
return Date.now() + options.runAfter;
}
return Date.now();
}