Minimalist Clojure wrapping of the Kafka Consumer API.
The current stability of this work is pre-alpha, and subject change.
[org.purefn/river "0.1.0"] Example usage may be found in the ./dev/dev.clj namespace, copied below:
(defn file-writer
[{:keys [file]} state records commit]
(with-open [w (io/writer file :append true)]
(doseq [{:keys [key value]} records]
(.write w (str key ":" value "\n"))))
(commit))
(defn dev-system
"Constructs a system map suitable for interactive development."
[]
(component/system-map
;; TODO
:producer (producer)
:consumer (component/using
(batch/batch-consumer
(assoc (batch/default-config)
::batch/bootstrap-servers "localhost:9092"
::batch/topics ["firefly"]
::batch/group-id "serenity")
#'file-writer)
[:file])
:file (File. "./simon.txt")
))Experimental.
Functions are provided for composition in ./src/org/purefn/river/flush.clj, an example is provided in the dev namespace (copied below):
(require '[org.purefn.river.flush :as flush])
(defn batch-writer
[{:keys [file]} records]
(with-open [w (io/writer file :append true)]
(.write w (reduce
(fn [acc {:keys [key value]}]
(str acc key ":" value "\n"))
""
records))))
(def processor
(-> batch-writer
(flush/flush)
(flush/timed 1000)
(flush/max-records 10)
(flush/accumulate)))To introduce minimal latency while supporting clojure.lang.ILookup so that the message
behaves like a Clojure map.
Testing was done with criterium. The results for the java class below:
dev> (criterium/with-progress-reporting (criterium/bench (org.purefn.river.Message. cr) :verbose))
Estimating sampling overhead
Warming up for JIT optimisations 10000000000 ...
compilation occurred before 506096 iterations
compilation occurred before 64256552 iterations
compilation occurred before 122947448 iterations
compilation occurred before 128512964 iterations
compilation occurred before 135596348 iterations
compilation occurred before 258543656 iterations
Estimating execution count ...
Sampling ...
Final GC...
Checking GC...
Finding outliers ...
Bootstrapping ...
Checking outlier significance
Warming up for JIT optimisations 10000000000 ...
compilation occurred before 2857409 iterations
compilation occurred before 31839396 iterations
Estimating execution count ...
Sampling ...
Final GC...
Checking GC...
Finding outliers ...
Bootstrapping ...
Checking outlier significance
amd64 Linux 4.15.0-46-generic 8 cpu(s)
OpenJDK 64-Bit Server VM 25.191-b12
Runtime arguments: -Dfile.encoding=UTF-8 -XX:-OmitStackTraceInFastThrow -XX:+TieredCompilation -XX:TieredStopAtLevel=1 -Dclojure.compile.path=/home/kocubinski/dev/org.purefn/river/target/classes -Driver.version=0.1.0-SNAPSHOT -Dclojure.debug=false
Evaluation count : 1939684080 in 60 samples of 32328068 calls.
Execution time sample mean : 22.397785 ns
Execution time mean : 22.401404 ns
Execution time sample std-deviation : 0.324397 ns
Execution time std-deviation : 0.329196 ns
Execution time lower quantile : 21.941269 ns ( 2.5%)
Execution time upper quantile : 23.316574 ns (97.5%)
Overhead used : 8.608025 ns
Found 3 outliers in 60 samples (5.0000 %)
low-severe 3 (5.0000 %)
Variance from outliers : 1.6389 % Variance is slightly inflated by outliers
Mean execution time 22.397785 ns.
And creating a new map:
dev> (criterium/with-progress-reporting (criterium/bench (to-map cr) :verbose))
Warming up for JIT optimisations 10000000000 ...
classes loaded before 38036 iterations
compilation occurred before 38036 iterations
compilation occurred before 228206 iterations
Estimating execution count ...
Sampling ...
Final GC...
Checking GC...
Finding outliers ...
Bootstrapping ...
Checking outlier significance
amd64 Linux 4.15.0-46-generic 8 cpu(s)
OpenJDK 64-Bit Server VM 25.191-b12
Runtime arguments: -Dfile.encoding=UTF-8 -XX:-OmitStackTraceInFastThrow -XX:+TieredCompilation -XX:TieredStopAtLevel=1 -Dclojure.compile.path=/home/kocubinski/dev/org.purefn/river/target/classes -Driver.version=0.1.0-SNAPSHOT -Dclojure.debug=false
Evaluation count : 4229760 in 60 samples of 70496 calls.
Execution time sample mean : 14.556789 µs
Execution time mean : 14.554769 µs
Execution time sample std-deviation : 309.115975 ns
Execution time std-deviation : 315.070609 ns
Execution time lower quantile : 14.217770 µs ( 2.5%)
Execution time upper quantile : 15.282596 µs (97.5%)
Overhead used : 8.608025 ns
Found 1 outliers in 60 samples (1.6667 %)
low-severe 1 (1.6667 %)
Variance from outliers : 9.4443 % Variance is slightly inflated by outliers
Mean execution time 14.556789 µs. Under these conditions creating a Clojure map was 636x slower, almost 3 orders of magnitude.