Skip to content

Latest commit

 

History

History
158 lines (130 loc) · 5.81 KB

0141-cl-flow.org

File metadata and controls

158 lines (130 loc) · 5.81 KB

cl-flow

CL-Flow is @borodust’s library and provides a way for lock-free parallel code execution. You can combine blocks of code and define how they should be executed - serially or in parallel.

This system is in the Quicklisp, but is not installable because requires bodge-queue which is not in the Quicklisp yet (or now). You need to install @borodust’s distribution first:

POFTHEDDAY> (ql-dist:install-dist
             "http://bodge.borodust.org/dist/org.borodust.bodge.txt"
             :replace t :prompt nil)

POFTHEDAY> (ql:quickload '(:simple-flow-dispatcher
                           :cl-flow
                           :log4cl
                           :dexador))

POFTHEDAY> (defun handle-error (e)
             (log:error "Unhandled error" e))

;; This code will help us to run flow blocks
;; in the thread pool:
POFTHEDAY> (defvar *dispatcher*
             (simple-flow-dispatcher:make-simple-dispatcher
              :threads 4
              :error-handler #'handle-error))

POFTHEDAY> (defun run (flow)
             (cl-flow:run *dispatcher* flow))

Here is an example from cl-flow’s documentation.

This code will run three blocks of code in parallel and then pass their results into another block:

POFTHEDAY> (run (flow:serially
                  (flow:concurrently
                    (flow:atomically :first ()
                      "Hello")
                    (flow:atomically :second ()
                      "Lisp")
                    (flow:atomically :third ()
                      "World"))
                  ;; Last block will receive results
                  ;; of all previous blocks:
                  (flow:atomically :finally (results)
                    (destructuring-bind (first second third)
                        results
                      (format t "~A ~A ~A~%"
                              first
                              second
                              third)))))

Sadly, the documentation does not cover a more interesting topic - blocks which generate other blocks. Let’s try to figure out how to use flow:dynamically to define a web crawler which will process pages recursively:

POFTHEDAY> (defparameter *base-url*
             "https://borodust.org/projects/cl-flow/")

POFTHEDAY> (defun is-external (url)
             (or (str:starts-with-p "mailto:" url)
                 (and (str:starts-with-p "http" url)
                      (not (str:starts-with-p *base-url* url)))))

POFTHEDAY> (defun make-full (url)
             (let ((new-url
                     (cond
                       ((or (str:starts-with-p "http" url)
                            (str:starts-with-p "mailto:" url))
                        url)
                       ((str:starts-with-p "/" url)
                        (concatenate 'string "https://borodust.org" url))
                       (t
                        (concatenate 'string *base-url* url)))))
               (cl-ppcre:regex-replace "#.*" new-url "")))

POFTHEDAY> (defun make-url-processor (already-processed url)
             (flow:serially
               (flow:atomically url ()
                 (log:info "Downloading ~A" url)
                 (dex:get url))

               ;; This block creates new blocks where each
               ;; will process a single url and produce more
               ;; blocks to process links from fetched pages:
               (flow:dynamically (content)
                 (flow:concurrently
                   (loop with page = (ignore-errors
                                      (plump:parse content))
                         for link in (when page
                                       (plump:get-elements-by-tag-name page "a"))
                         for link-url = (plump:attribute link "href")
                         for full-url = (make-full link-url)
                         unless (or (is-external full-url)
                                    (gethash full-url already-processed))
                           collect (progn
                                     (setf (gethash full-url already-processed)
                                           t)
                                     (make-url-processor already-processed
                                                         full-url)))))))

Now we can start it:

POFTHEDAY> (let ((already-processed (make-hash-table :test 'equal)))
             (run
              (make-url-processor already-processed *base-url*))
             already-processed)

 <INFO> [23:10:00] poftheday (make-url-processor body-fu3) -
  Downloading https://borodust.org/projects/
#<HASH-TABLE :TEST EQUAL :COUNT 0 {10073D59A3}>
 <INFO> [23:10:00] poftheday (make-url-processor body-fu3) -
  Downloading https://borodust.org/projects/vinoyaku/
...
 <INFO> [23:10:01] poftheday (make-url-processor body-fu3) -
  Downloading https://borodust.org/projects/cl-bodge/overview/

;; These URL were processed by our crawler:
POFTHEDAY> (rutils:hash-table-to-alist *)
(("https://borodust.org/projects/vinoyaku/" . T)
 ("https://borodust.org/projects/trivial-gamekit/" . T)
 ("https://borodust.org/projects/cl-flow/" . T)
 ("https://borodust.org/projects/cl-bodge/" . T)
 ("https://borodust.org/projects/" . T)
 ("https://borodust.org/projects/cl-flow/getting-started/" . T)
 ("https://borodust.org/projects/trivial-gamekit/getting-started/" . T)
 ("https://borodust.org/projects/trivial-gamekit/advanced/" . T)
 ("https://borodust.org/projects/trivial-gamekit/manual/" . T)
 ("https://borodust.org/projects/cl-bodge/overview/" . T))

It would be nice if @borodust will do a little code review and check if I used cl-flow correctly or not.