|
3 | 3 | (require openssl/sha1) |
4 | 4 | (require (only-in xml write-xexpr)) |
5 | 5 | (require json) |
| 6 | +(require data/queue) |
6 | 7 |
|
7 | 8 | (require "../syntax/read.rkt" |
8 | 9 | "../syntax/sugar.rkt" |
|
231 | 232 | (define busy-workers (make-hash)) |
232 | 233 | (define waiting-workers (make-hash)) |
233 | 234 | (define current-jobs (make-hash)) |
| 235 | + (define queued-job-ids (make-queue)) |
234 | 236 | (when (eq? worker-count #t) |
235 | 237 | (set! worker-count (processor-count))) |
236 | 238 | (for ([i (in-range worker-count)]) |
|
243 | 245 |
|
244 | 246 | ;; Private API |
245 | 247 | [(list 'assign self) |
246 | | - (define reassigned (make-hash)) |
| 248 | + (define reassigned '()) |
247 | 249 | (for ([(wid worker) (in-hash waiting-workers)] |
248 | | - [(jid command) (in-hash queued-jobs)]) |
| 250 | + #:when (not (queue-empty? queued-job-ids))) |
| 251 | + (define jid (dequeue! queued-job-ids)) |
| 252 | + (define command (hash-ref queued-jobs jid)) |
249 | 253 | (log "Starting worker [~a] on [~a].\n" jid (test-name (herbie-command-test command))) |
250 | | - ; Check if the job is already in progress. |
251 | | - (unless (hash-has-key? current-jobs jid) |
252 | | - (place-channel-put worker (list 'apply self command jid)) |
253 | | - (hash-set! current-jobs jid wid) |
254 | | - (hash-set! reassigned wid jid) |
255 | | - (hash-set! busy-workers wid worker))) |
256 | | - ; remove X many jobs from the Q and update waiting-workers |
257 | | - (for ([(wid jid) (in-hash reassigned)]) |
258 | | - (hash-remove! waiting-workers wid) |
259 | | - (hash-remove! queued-jobs jid))] |
| 254 | + (place-channel-put worker (list 'apply self command jid)) |
| 255 | + (hash-set! current-jobs jid wid) |
| 256 | + (hash-set! busy-workers wid worker) |
| 257 | + (set! reassigned (cons wid reassigned)) |
| 258 | + (hash-remove! queued-jobs jid)) |
| 259 | + (for ([wid reassigned]) |
| 260 | + (hash-remove! waiting-workers wid))] |
260 | 261 | ; Job is finished save work and free worker. Move work to 'send state. |
261 | 262 | [(list 'finished self wid job-id result) |
262 | 263 | (log "Job ~a finished, saving result.\n" job-id) |
|
285 | 286 | (place-channel-put self (list 'send job-id (hash-ref completed-jobs job-id)))] |
286 | 287 | [else |
287 | 288 | (hash-set! queued-jobs job-id job) |
| 289 | + (enqueue! queued-job-ids job-id) |
288 | 290 | (place-channel-put self (list 'assign self))])] |
289 | 291 | [(list 'wait handler self job-id) |
290 | 292 | (log "Waiting for job: ~a\n" job-id) |
|
0 commit comments