@@ -182,13 +182,57 @@ namespace cppq {
182
182
this ->dequeuedAtMs = dequeuedAtMs;
183
183
this ->state = stringToState (state);
184
184
}
185
+ Task (
186
+ std::string uuid,
187
+ std::string type,
188
+ std::string payload,
189
+ std::string state,
190
+ uint64_t maxRetry,
191
+ uint64_t retried,
192
+ uint64_t dequeuedAtMs,
193
+ uint64_t schedule
194
+ ) {
195
+ uuid_t uuid_parsed;
196
+ uuid_parse (uuid.c_str (), uuid_parsed);
197
+ uuid_copy (this ->uuid , uuid_parsed);
198
+ this ->type = type;
199
+ this ->payload = payload;
200
+ this ->maxRetry = maxRetry;
201
+ this ->retried = retried;
202
+ this ->dequeuedAtMs = dequeuedAtMs;
203
+ this ->state = stringToState (state);
204
+ this ->schedule = schedule;
205
+ }
206
+ Task (
207
+ std::string uuid,
208
+ std::string type,
209
+ std::string payload,
210
+ std::string state,
211
+ uint64_t maxRetry,
212
+ uint64_t retried,
213
+ uint64_t dequeuedAtMs,
214
+ std::string cron
215
+ ) {
216
+ uuid_t uuid_parsed;
217
+ uuid_parse (uuid.c_str (), uuid_parsed);
218
+ uuid_copy (this ->uuid , uuid_parsed);
219
+ this ->type = type;
220
+ this ->payload = payload;
221
+ this ->maxRetry = maxRetry;
222
+ this ->retried = retried;
223
+ this ->dequeuedAtMs = dequeuedAtMs;
224
+ this ->state = stringToState (state);
225
+ this ->cron = cron;
226
+ }
185
227
uuid_t uuid;
186
228
std::string type;
187
229
std::string payload;
188
230
TaskState state;
189
231
uint64_t maxRetry;
190
232
uint64_t retried;
191
233
uint64_t dequeuedAtMs;
234
+ uint64_t schedule;
235
+ std::string cron;
192
236
std::string result;
193
237
};
194
238
@@ -199,29 +243,83 @@ namespace cppq {
199
243
handlers[type] = handler;
200
244
}
201
245
202
- void enqueue (redisContext *c, Task task, std::string queue) {
246
+ typedef enum { Cron, TimePoint, None } ScheduleType;
247
+
248
+ typedef struct ScheduleOptions {
249
+ union {
250
+ const char *cron;
251
+ std::chrono::system_clock::time_point time ;
252
+ };
253
+ ScheduleType type;
254
+ } ScheduleOptions;
255
+
256
+ ScheduleOptions scheduleOptions (std::chrono::system_clock::time_point t) {
257
+ return ScheduleOptions{ .time = t, .type = ScheduleType::TimePoint };
258
+ }
259
+
260
+ ScheduleOptions scheduleOptions (std::string c) {
261
+ return ScheduleOptions{ .cron = c.c_str (), .type = ScheduleType::Cron };
262
+ }
263
+
264
+ void enqueue (redisContext *c, Task task, std::string queue, ScheduleOptions s) {
203
265
task.state = TaskState::Pending;
204
266
205
267
redisCommand (c, " MULTI" );
206
- redisCommand (c, " LPUSH cppq:%s:pending %s" , queue.c_str (), uuidToString (task.uuid ).c_str ());
207
- redisCommand (
208
- c,
209
- " HSET cppq:%s:task:%s type %s payload %s state %s maxRetry %d retried %d dequeuedAtMs %d" ,
210
- queue.c_str (),
211
- uuidToString (task.uuid ).c_str (),
212
- task.type .c_str (),
213
- task.payload .c_str (),
214
- stateToString (task.state ).c_str (),
215
- task.maxRetry ,
216
- task.retried ,
217
- task.dequeuedAtMs
218
- );
268
+ if (s.type == ScheduleType::None) {
269
+ redisCommand (c, " LPUSH cppq:%s:pending %s" , queue.c_str (), uuidToString (task.uuid ).c_str ());
270
+ redisCommand (
271
+ c,
272
+ " HSET cppq:%s:task:%s type %s payload %s state %s maxRetry %d retried %d dequeuedAtMs %d" ,
273
+ queue.c_str (),
274
+ uuidToString (task.uuid ).c_str (),
275
+ task.type .c_str (),
276
+ task.payload .c_str (),
277
+ stateToString (task.state ).c_str (),
278
+ task.maxRetry ,
279
+ task.retried ,
280
+ task.dequeuedAtMs
281
+ );
282
+ } else if (s.type == ScheduleType::TimePoint) {
283
+ redisCommand (c, " LPUSH cppq:%s:scheduled %s" , queue.c_str (), uuidToString (task.uuid ).c_str ());
284
+ redisCommand (
285
+ c,
286
+ " HSET cppq:%s:task:%s type %s payload %s state %s maxRetry %d retried %d dequeuedAtMs %d schedule %lu" ,
287
+ queue.c_str (),
288
+ uuidToString (task.uuid ).c_str (),
289
+ task.type .c_str (),
290
+ task.payload .c_str (),
291
+ stateToString (task.state ).c_str (),
292
+ task.maxRetry ,
293
+ task.retried ,
294
+ task.dequeuedAtMs ,
295
+ std::chrono::duration_cast<std::chrono::milliseconds>(s.time .time_since_epoch ()).count ()
296
+ );
297
+ } else if (s.type == ScheduleType::Cron) {
298
+ redisCommand (c, " LPUSH cppq:%s:scheduled %s" , queue.c_str (), uuidToString (task.uuid ).c_str ());
299
+ redisCommand (
300
+ c,
301
+ " HSET cppq:%s:task:%s type %s payload %s state %s maxRetry %d retried %d dequeuedAtMs %d cron %s" ,
302
+ queue.c_str (),
303
+ uuidToString (task.uuid ).c_str (),
304
+ task.type .c_str (),
305
+ task.payload .c_str (),
306
+ stateToString (task.state ).c_str (),
307
+ task.maxRetry ,
308
+ task.retried ,
309
+ task.dequeuedAtMs ,
310
+ s.cron
311
+ );
312
+ }
219
313
redisReply *reply = (redisReply *)redisCommand (c, " EXEC" );
220
314
221
315
if (reply->type == REDIS_REPLY_ERROR)
222
316
throw std::runtime_error (" Failed to enqueue task" );
223
317
}
224
318
319
+ void enqueue (redisContext *c, Task task, std::string queue) {
320
+ return enqueue (c, task, queue, ScheduleOptions{ .cron = " " , .type = ScheduleType::None });
321
+ }
322
+
225
323
std::optional<Task> dequeue (redisContext *c, std::string queue) {
226
324
redisReply *reply = (redisReply *)redisCommand (c, " LRANGE cppq:%s:pending -1 -1" , queue.c_str ());
227
325
if (reply->type != REDIS_REPLY_ARRAY)
@@ -262,6 +360,45 @@ namespace cppq {
262
360
return std::make_optional<Task>(task);
263
361
}
264
362
363
+ std::optional<Task> dequeueScheduled (redisContext *c, std::string queue, char *getScheduledScriptSHA) {
364
+ redisReply *reply = (redisReply *)redisCommand (c, " EVALSHA %s 0 %s" , getScheduledScriptSHA, queue.c_str ());
365
+ if (reply->type != REDIS_REPLY_STRING)
366
+ return {};
367
+ std::string uuid = reply->str ;
368
+
369
+ uint64_t dequeuedAtMs = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now ().time_since_epoch ()).count ();
370
+
371
+ redisCommand (c, " MULTI" );
372
+ redisCommand (c, " LREM cppq:%s:scheduled 1 %s" , queue.c_str (), uuid.c_str ());
373
+ redisCommand (c, " HGET cppq:%s:task:%s type" , queue.c_str (), uuid.c_str ());
374
+ redisCommand (c, " HGET cppq:%s:task:%s payload" , queue.c_str (), uuid.c_str ());
375
+ redisCommand (c, " HGET cppq:%s:task:%s state" , queue.c_str (), uuid.c_str ());
376
+ redisCommand (c, " HGET cppq:%s:task:%s maxRetry" , queue.c_str (), uuid.c_str ());
377
+ redisCommand (c, " HGET cppq:%s:task:%s retried" , queue.c_str (), uuid.c_str ());
378
+ redisCommand (c, " HGET cppq:%s:task:%s dequeuedAtMs" , queue.c_str (), uuid.c_str ());
379
+ redisCommand (c, " HGET cppq:%s:task:%s schedule" , queue.c_str (), uuid.c_str ());
380
+ redisCommand (c, " HSET cppq:%s:task:%s dequeuedAtMs %lu" , queue.c_str (), uuid.c_str (), dequeuedAtMs);
381
+ redisCommand (c, " HSET cppq:%s:task:%s state %s" , queue.c_str (), uuid.c_str (), stateToString (TaskState::Active).c_str ());
382
+ redisCommand (c, " LPUSH cppq:%s:active %s" , queue.c_str (), uuid.c_str ());
383
+ reply = (redisReply *)redisCommand (c, " EXEC" );
384
+
385
+ if (reply->type != REDIS_REPLY_ARRAY || reply->elements != 11 )
386
+ return {};
387
+
388
+ Task task = Task (
389
+ uuid,
390
+ reply->element [1 ]->str ,
391
+ reply->element [2 ]->str ,
392
+ stateToString (TaskState::Active),
393
+ strtoull (reply->element [4 ]->str , NULL , 0 ),
394
+ strtoull (reply->element [5 ]->str , NULL , 0 ),
395
+ dequeuedAtMs,
396
+ strtoull (reply->element [6 ]->str , NULL , 0 )
397
+ );
398
+
399
+ return std::make_optional<Task>(task);
400
+ }
401
+
265
402
void taskRunner (redisOptions redisOpts, Task task, std::string queue) {
266
403
redisContext *c = redisConnectWithOptions (&redisOpts);
267
404
if (c == NULL || c->err ) {
@@ -369,13 +506,26 @@ namespace cppq {
369
506
}
370
507
}
371
508
509
+ const char *getScheduledScript = R"DOC(
510
+ local timeCall = redis.call('time')
511
+ local time = timeCall[1] ..timeCall[2]
512
+ local scheduled = redis.call('LRANGE', 'cppq:' .. ARGV[1] .. ':scheduled', 0, -1)
513
+ for _,key in ipairs(scheduled) do
514
+ if (time > redis.call('HGET', 'cppq:' .. ARGV[1] .. ':task:' .. key, 'schedule')) then
515
+ return key
516
+ end
517
+ end)DOC" ;
518
+
372
519
void runServer (redisOptions redisOpts, std::map<std::string, int > queues, uint64_t recoveryTimeoutSecond) {
373
520
redisContext *c = redisConnectWithOptions (&redisOpts);
374
521
if (c == NULL || c->err ) {
375
522
std::cerr << " Failed to connect to Redis" << std::endl;
376
523
return ;
377
524
}
378
525
526
+ redisReply *reply = (redisReply *)redisCommand (c, " SCRIPT LOAD %s" , getScheduledScript);
527
+ char *getScheduledScriptSHA = reply->str ;
528
+
379
529
std::vector<std::pair<std::string, int >> queuesVector;
380
530
for (auto & it : queues) queuesVector.push_back (it);
381
531
sort (queuesVector.begin (), queuesVector.end (), [](std::pair<std::string, int > const & a, std::pair<std::string, int > const & b) { return a.second > b.second ; });
@@ -386,7 +536,10 @@ namespace cppq {
386
536
while (true ) {
387
537
std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
388
538
for (std::vector<std::pair<std::string, int >>::iterator it = queuesVector.begin (); it != queuesVector.end (); it++) {
389
- std::optional<Task> task = dequeue (c, it->first );
539
+ std::optional<Task> task;
540
+ task = dequeueScheduled (c, it->first , getScheduledScriptSHA);
541
+ if (!task.has_value ())
542
+ task = dequeue (c, it->first );
390
543
if (task.has_value ()) {
391
544
pool.push_task (taskRunner, redisOpts, task.value (), it->first );
392
545
break ;
0 commit comments