|
21 | 21 |
|
22 | 22 | (provide make-path |
23 | 23 | get-improve-table-data |
24 | | - make-improve-result |
25 | 24 | server-check-on |
26 | 25 | get-results-for |
27 | 26 | get-timeline-for |
|
31 | 30 | wait-for-job |
32 | 31 | start-job-server |
33 | 32 | write-results-to-disk |
34 | | - *demo?* |
35 | 33 | *demo-output*) |
36 | 34 |
|
37 | | -(define *demo?* (make-parameter false)) |
38 | 35 | (define *demo-output* (make-parameter false)) |
39 | 36 |
|
40 | 37 | (define log-level #f) |
|
118 | 115 | (place-channel-put manager (list* msg args)) |
119 | 116 | (match msg |
120 | 117 | ['start |
121 | | - (match-define (list hash-false command job-id) args) |
122 | | - (hash-set! completed-work job-id (herbie-do-server-job command job-id))]))) |
| 118 | + (match-define (list #f command job-id) args) |
| 119 | + (hash-set! queued-jobs job-id command)]))) |
123 | 120 |
|
124 | 121 | (define (manager-ask msg . args) |
125 | 122 | (log "Asking manager: ~a, ~a.\n" msg args) |
126 | 123 | (if manager |
127 | 124 | (manager-ask-with-callback msg args) |
128 | 125 | (match (list* msg args) ; public commands |
129 | | - [(list 'wait hash-false job-id) (hash-ref completed-work job-id)] |
130 | | - [(list 'result job-id) (hash-ref completed-work job-id #f)] |
131 | | - [(list 'timeline job-id) (hash-ref completed-work job-id #f)] |
132 | | - [(list 'check job-id) (and (hash-ref completed-work job-id #f) job-id)] |
| 126 | + [(list 'wait hash-false job-id) |
| 127 | + (define command (hash-ref queued-jobs job-id)) |
| 128 | + (define result (herbie-do-server-job command job-id)) |
| 129 | + (hash-set! completed-jobs job-id result) |
| 130 | + result] |
| 131 | + [(list 'result job-id) (hash-ref completed-jobs job-id #f)] |
| 132 | + [(list 'timeline job-id) (hash-ref completed-jobs job-id #f)] |
| 133 | + [(list 'check job-id) (and (hash-ref completed-jobs job-id #f) job-id)] |
133 | 134 | [(list 'count) (list 0 0)] |
134 | 135 | [(list 'improve) |
135 | | - (for/list ([(job-id result) (in-hash completed-work)] |
| 136 | + (for/list ([(job-id result) (in-hash completed-jobs)] |
136 | 137 | #:when (equal? (hash-ref result 'command) "improve")) |
137 | 138 | (get-table-data-from-hash result (make-path job-id)))]))) |
138 | 139 |
|
|
161 | 162 | 'path |
162 | 163 | (make-path job-id))) |
163 | 164 |
|
164 | | -(define completed-work (make-hash)) |
| 165 | +(define queued-jobs (make-hash)) |
| 166 | +(define completed-jobs (make-hash)) |
165 | 167 |
|
166 | 168 | (define (manager-ask-with-callback msg args) |
167 | 169 | (define-values (a b) (place-channel)) |
|
214 | 216 | (parameterize ([params fresh] ...) |
215 | 217 | body ...))))])) |
216 | 218 |
|
217 | | -(struct work-item (command id)) |
218 | | - |
219 | 219 | (define (make-manager worker-count) |
220 | 220 | (place/context* |
221 | 221 | ch |
|
240 | 240 | (hash-set! waiting-workers i (make-worker i))) |
241 | 241 | (log "~a workers ready.\n" (hash-count waiting-workers)) |
242 | 242 | (define waiting (make-hash)) |
243 | | - (define job-queue (list)) |
244 | 243 | (log "Manager waiting to assign work.\n") |
245 | 244 | (for ([i (in-naturals)]) |
246 | 245 | (match (place-channel-get ch) |
247 | 246 | [(list 'start self command job-id) |
248 | 247 | ; Check if the work has been completed already if not assign the work. |
249 | | - (if (hash-has-key? completed-work job-id) |
250 | | - (place-channel-put self (list 'send job-id (hash-ref completed-work job-id))) |
251 | | - (place-channel-put self (list 'queue self job-id command)))] |
252 | | - [(list 'queue self job-id command) |
253 | | - (set! job-queue (append job-queue (list (work-item command job-id)))) |
254 | | - (place-channel-put self (list 'assign self))] |
| 248 | + (cond |
| 249 | + [(hash-has-key? completed-jobs job-id) |
| 250 | + (place-channel-put self (list 'send job-id (hash-ref completed-jobs job-id)))] |
| 251 | + [else |
| 252 | + (hash-set! queued-jobs job-id command) |
| 253 | + (place-channel-put self (list 'assign self))])] |
255 | 254 | [(list 'assign self) |
256 | 255 | (define reassigned (make-hash)) |
257 | 256 | (for ([(wid worker) (in-hash waiting-workers)] |
258 | | - [job (in-list job-queue)]) |
259 | | - (log "Starting worker [~a] on [~a].\n" |
260 | | - (work-item-id job) |
261 | | - (test-name (herbie-command-test (work-item-command job)))) |
| 257 | + [(jid command) (in-hash queued-jobs)]) |
| 258 | + (log "Starting worker [~a] on [~a].\n" jid (test-name (herbie-command-test command))) |
262 | 259 | ; Check if the job is already in progress. |
263 | | - (unless (hash-has-key? current-jobs (work-item-id job)) |
264 | | - (hash-set! current-jobs (work-item-id job) wid) |
265 | | - (place-channel-put worker (list 'apply self (work-item-command job) (work-item-id job))) |
266 | | - (hash-set! reassigned wid worker) |
| 260 | + (unless (hash-has-key? current-jobs jid) |
| 261 | + (place-channel-put worker (list 'apply self command jid)) |
| 262 | + (hash-set! reassigned wid jid) |
267 | 263 | (hash-set! busy-workers wid worker))) |
268 | 264 | ; remove X many jobs from the Q and update waiting-workers |
269 | | - (for ([(wid worker) (in-hash reassigned)]) |
| 265 | + (for ([(wid jid) (in-hash reassigned)]) |
270 | 266 | (hash-remove! waiting-workers wid) |
271 | | - (set! job-queue (cdr job-queue)))] |
| 267 | + (hash-remove! queued-jobs jid))] |
272 | 268 | ; Job is finished save work and free worker. Move work to 'send state. |
273 | 269 | [(list 'finished self wid job-id result) |
274 | 270 | (log "Job ~a finished, saving result.\n" job-id) |
275 | | - (hash-set! completed-work job-id result) |
| 271 | + (hash-set! completed-jobs job-id result) |
276 | 272 |
|
277 | 273 | ; move worker to waiting list |
278 | 274 | (hash-remove! current-jobs job-id) |
|
286 | 282 | (log "Waiting for job: ~a\n" job-id) |
287 | 283 | ; first we add the handler to the wait list. |
288 | 284 | (hash-update! waiting job-id (curry append (list handler)) '()) |
289 | | - (define result (hash-ref completed-work job-id #f)) |
| 285 | + (define result (hash-ref completed-jobs job-id #f)) |
290 | 286 | ; check if the job is completed or not. |
291 | 287 | (unless (false? result) |
292 | 288 | (log "Done waiting for job: ~a\n" job-id) |
|
298 | 294 | (place-channel-put handle result)) |
299 | 295 | (hash-remove! waiting job-id)] |
300 | 296 | ; Get the result for the given id, return false if no work found. |
301 | | - [(list 'result handler job-id) (place-channel-put handler (hash-ref completed-work job-id #f))] |
| 297 | + [(list 'result handler job-id) (place-channel-put handler (hash-ref completed-jobs job-id #f))] |
302 | 298 | [(list 'timeline handler job-id) |
303 | 299 | (define wid (hash-ref current-jobs job-id #f)) |
304 | 300 | (cond |
|
310 | 306 | (place-channel-put handler requested-timeline)] |
311 | 307 | [else |
312 | 308 | (log "Job complete, no timeline, send result.\n") |
313 | | - (place-channel-put handler (hash-ref completed-work job-id #f))])] |
| 309 | + (place-channel-put handler (hash-ref completed-jobs job-id #f))])] |
314 | 310 | [(list 'check handler job-id) |
315 | | - (place-channel-put handler (and (hash-has-key? completed-work job-id) job-id))] |
| 311 | + (place-channel-put handler (and (hash-has-key? completed-jobs job-id) job-id))] |
316 | 312 | ; Returns the current count of working workers. |
317 | 313 | [(list 'count handler) |
318 | 314 | (log "Count requested\n") |
319 | | - (place-channel-put handler (list (hash-count busy-workers) (length job-queue)))] |
| 315 | + (place-channel-put handler (list (hash-count busy-workers) (hash-count queued-jobs)))] |
320 | 316 | ; Retreive the improve results for results.json |
321 | 317 | [(list 'improve handler) |
322 | 318 | (define improved-list |
323 | | - (for/list ([(job-id result) (in-hash completed-work)] |
| 319 | + (for/list ([(job-id result) (in-hash completed-jobs)] |
324 | 320 | #:when (equal? (hash-ref result 'command) "improve")) |
325 | 321 | (get-table-data-from-hash result (make-path job-id)))) |
326 | 322 | (place-channel-put handler improved-list)])))) |
|
0 commit comments