@@ -25,17 +25,11 @@ export const enqueue = mutation({
25
25
fnHandle : v . string ( ) ,
26
26
fnName : v . string ( ) ,
27
27
fnArgs : v . any ( ) ,
28
- fnType : v . union (
29
- v . literal ( "action" ) ,
30
- v . literal ( "mutation" ) ,
31
- v . literal ( "unknown" )
32
- ) ,
33
- runAtTime : v . number ( ) ,
28
+ fnType : v . union ( v . literal ( "action" ) , v . literal ( "mutation" ) ) ,
34
29
options : v . object ( {
35
30
maxParallelism : v . number ( ) ,
36
31
actionTimeoutMs : v . optional ( v . number ( ) ) ,
37
32
mutationTimeoutMs : v . optional ( v . number ( ) ) ,
38
- unknownTimeoutMs : v . optional ( v . number ( ) ) ,
39
33
debounceMs : v . optional ( v . number ( ) ) ,
40
34
fastHeartbeatMs : v . optional ( v . number ( ) ) ,
41
35
slowHeartbeatMs : v . optional ( v . number ( ) ) ,
@@ -44,16 +38,12 @@ export const enqueue = mutation({
44
38
} ) ,
45
39
} ,
46
40
returns : v . id ( "pendingWork" ) ,
47
- handler : async (
48
- ctx ,
49
- { fnHandle, fnName, options, fnArgs, fnType, runAtTime }
50
- ) => {
41
+ handler : async ( ctx , { fnHandle, fnName, options, fnArgs, fnType } ) => {
51
42
const debounceMs = options . debounceMs ?? 50 ;
52
43
await ensurePoolExists ( ctx , {
53
44
maxParallelism : options . maxParallelism ,
54
45
actionTimeoutMs : options . actionTimeoutMs ?? 15 * 60 * 1000 ,
55
46
mutationTimeoutMs : options . mutationTimeoutMs ?? 30 * 1000 ,
56
- unknownTimeoutMs : options . unknownTimeoutMs ?? 15 * 60 * 1000 ,
57
47
debounceMs,
58
48
fastHeartbeatMs : options . fastHeartbeatMs ?? 10 * 1000 ,
59
49
slowHeartbeatMs : options . slowHeartbeatMs ?? 2 * 60 * 60 * 1000 ,
@@ -65,10 +55,8 @@ export const enqueue = mutation({
65
55
fnName,
66
56
fnArgs,
67
57
fnType,
68
- runAtTime,
69
58
} ) ;
70
- const delay = Math . max ( runAtTime - Date . now ( ) , debounceMs ) ;
71
- await kickMainLoop ( ctx , delay , false ) ;
59
+ await kickMainLoop ( ctx , debounceMs , false ) ;
72
60
return workId ;
73
61
} ,
74
62
} ) ;
@@ -150,10 +138,7 @@ export const mainLoop = internalMutation({
150
138
BATCH_SIZE
151
139
) ;
152
140
let didSomething = false ;
153
- const pending = await ctx . db
154
- . query ( "pendingWork" )
155
- . withIndex ( "runAtTime" , ( q ) => q . lte ( "runAtTime" , Date . now ( ) ) )
156
- . take ( toSchedule ) ;
141
+ const pending = await ctx . db . query ( "pendingWork" ) . take ( toSchedule ) ;
157
142
console_ . debug ( `scheduling ${ pending . length } pending work` ) ;
158
143
await Promise . all (
159
144
pending . map ( async ( work ) => {
@@ -254,12 +239,9 @@ export const mainLoop = internalMutation({
254
239
} else {
255
240
// Decide when to wake up.
256
241
const allInProgressWork = await ctx . db . query ( "inProgressWork" ) . collect ( ) ;
257
- const nextPending = await ctx . db
258
- . query ( "pendingWork" )
259
- . withIndex ( "runAtTime" )
260
- . first ( ) ;
242
+ const nextPending = await ctx . db . query ( "pendingWork" ) . first ( ) ;
261
243
const nextPendingTime = nextPending
262
- ? nextPending . runAtTime
244
+ ? nextPending . _creationTime
263
245
: slowHeartbeatMs + Date . now ( ) ;
264
246
const nextInProgress = allInProgressWork . length
265
247
? Math . min (
@@ -285,8 +267,8 @@ async function beginWork(
285
267
if ( ! options ) {
286
268
throw new Error ( "cannot begin work with no pool" ) ;
287
269
}
288
- recordStarted ( work . _id , work . fnName , work . _creationTime , work . runAtTime ) ;
289
- const { mutationTimeoutMs, actionTimeoutMs, unknownTimeoutMs } = options ;
270
+ recordStarted ( work . _id , work . fnName , work . _creationTime ) ;
271
+ const { mutationTimeoutMs, actionTimeoutMs } = options ;
290
272
if ( work . fnType === "action" ) {
291
273
return {
292
274
scheduledId : await ctx . scheduler . runAfter (
@@ -313,12 +295,6 @@ async function beginWork(
313
295
) ,
314
296
timeoutMs : mutationTimeoutMs ,
315
297
} ;
316
- } else if ( work . fnType === "unknown" ) {
317
- const fnHandle = work . fnHandle as FunctionHandle < "action" | "mutation" > ;
318
- return {
319
- scheduledId : await ctx . scheduler . runAfter ( 0 , fnHandle , work . fnArgs ) ,
320
- timeoutMs : unknownTimeoutMs ,
321
- } ;
322
298
} else {
323
299
throw new Error ( `Unexpected fnType ${ work . fnType } ` ) ;
324
300
}
0 commit comments