diff --git a/README.md b/README.md index a8d61a4..548b36b 100644 --- a/README.md +++ b/README.md @@ -9,15 +9,13 @@ A collection of primitives for event-driven programming. ## Installation -1. Add the dependency to your `shard.yml`: +Add the dependency to your `shard.yml` and run `shards install`: - ```yaml - dependencies: - concur: - github: your-github-user/concur - ``` - -2. Run `shards install` + ```yaml + dependencies: + concur: + github: lbarasti/concur + ``` ## Usage @@ -25,15 +23,71 @@ A collection of primitives for event-driven programming. require "concur" ``` -TODO: Write usage instructions here +### Using Future +You can use `Future` to wrap asynchronous computations that might fail. +```crystal +f = Future.new { + sleep 2 # a placeholder for some expensive computation or for a lengthy IO operation + "Success!" +} + +f.await # => "Success!" +``` +If you want to keep on manipulating the result of a future in a separate fiber, then you can rely on `Future`'s instance methods. + +For example, given a future `f`, you can apply a function to the wrapped value with `#map`, filter it with `#select` and recover from failure with `#recover` + +```crystal +f.map { |v| v.downcase } + .select { |v| v.size < 3 } + .recover { "default_key" } +``` + +Here is a contrived example to showcase some other common methods. + +You can combine the result of two Futures into one with `#zip`: + +```crystal +author_f : Future(User) = db.user_by_id(1) +reviewer_f : Future(User) = db.user_by_id(2) + +permission_f : Future(Bool) = author_f.zip(reviewer_f) { |author, reviewer| + author.has_reviewer?(reviewer) +} +``` + +You can use `#flat_map` to avoid nesting futures: + +```crystal +content_f : Future(Content) = permission_f + .flat_map { |reviewer_is_allowed| + if reviewer_is_allowed + db.content_by_user(1) # => Future(Content) + else + raise NotAllowedError.new + end + } +``` + +And perform side effects with `#on_success` and `#on_error`. + +```crystal +content_f + .on_success { |content| + reviewer_f.await!.email(content) + } + .on_error { |ex| log_error(ex) } +``` + +Check out the [API docs](https://lbarasti.com/concur/) for more details. ## Development -TODO: Write development instructions here +Run `shards install` to install the project dependencies. You can then run `crystal spec` to verify that all the tests are passing. ## Contributing -1. Fork it () +1. Fork it () 2. Create your feature branch (`git checkout -b my-new-feature`) 3. Commit your changes (`git commit -am 'Add some feature'`) 4. Push to the branch (`git push origin my-new-feature`) @@ -41,4 +95,4 @@ TODO: Write development instructions here ## Contributors -- [lorenzo.barasti](https://github.com/your-github-user) - creator and maintainer +- [lbarasti](https://github.com/lbarasti) - creator and maintainer diff --git a/shard.yml b/shard.yml index cb7459c..9fc046b 100644 --- a/shard.yml +++ b/shard.yml @@ -1,10 +1,10 @@ name: concur -version: 0.1.0 +version: 0.2.0 authors: - lbarasti -crystal: 1.0.0 +crystal: ">=1.0.0" dependencies: rate_limiter: diff --git a/spec/future_spec.cr b/spec/future_spec.cr new file mode 100644 index 0000000..d25ed35 --- /dev/null +++ b/spec/future_spec.cr @@ -0,0 +1,259 @@ +require "./spec_helper" + +describe Future do + it "runs a computation asyncronously" do + ch = Channel(Nil).new + + Future.new { ch.send nil } + + ch.receive + end + + it "can be awaited on" do + f = Future.new { 2** 10 } + res = f.await + res.should eq 1024 + typeof(res).should eq (Int32 | Exception) + end + + it "can be awaited on multiple times" do + f = Future.new { 2** 10 } + 10.times { + f.await.should eq 1024 + } + end + + it "can be awaited on by multiple fibers" do + n_fibers = 100 + results = Channel(Int32 | Exception).new(n_fibers) + f = Future.new { sleep rand; 2** 10 } + n_fibers.times { + spawn { + results.send f.await + } + } + n_fibers.times { + results.receive.should eq 1024 + } + end + + it "can be awaited on and raise on exception" do + f = Future.new { sleep 0.4; raise CustomError.new("error"); 42 } + expect_raises(CustomError) { + f.await! + } + end + + it "can be awaited on with timeout" do + f = Future.new { sleep 0.6; 42 } + f.await(0.2.seconds) + .should be_a Concur::Timeout + f.await + .should eq 42 + f.await(0.1.seconds) + .should eq 42 + end + + it "can be awaited on with timeout and raise on exception" do + f = Future.new { sleep 0.4; raise CustomError.new("error"); 42 } + expect_raises(Concur::Timeout) { + f.await!(0.2.seconds) + } + expect_raises(CustomError) { + f.await!(0.3.seconds) + } + end + + it "can be queried for completion" do + f = Future.new { sleep 0.3; :done } + f.done?.should be_false + f.await + f.done?.should be_true + end + + it "supports running callbacks, in order, on completion" do + results = [] of Int32 + f = Future.new { 5 } + f.on_complete { |r| + results << r.as(Int32) + 1 + raise Exception.new("runtime exception") + }.on_complete { |r| + case r + when Int32 + results << r + end + }.await.should eq 5 + + results.should eq [6, 5] + end + + it "supports running callbacks in order on success" do + results = [] of Int32 + f = Future.new { 5 } + f.on_success { |r| + results << r.as(Int32) + 1 + raise Exception.new("runtime exception") + }.on_success { |r| + case r + when Int32 + results << r + end + }.await.should eq 5 + + results.should eq [6, 5] + end + + it "will not run #on_success callbacks if the future fails" do + results = [] of Int32 + f = Future.new { raise CustomError.new; 5 } + f.on_success { |r| + results << 1 + }.on_success { |r| + raise Exception.new + results << 2 + }.await.should be_a CustomError + + results.should be_empty + end + + it "supports running callbacks in order on error" do + results = [] of Exception + f = Future.new { raise CustomError.new; 5 } + + f.on_error { |ex| + results << ex.as(CustomError) + raise Exception.new("runtime exception") + }.on_error { |ex| + results << Timeout.new + }.await.should be_a CustomError + + results.first.should be_a CustomError + results.last.should be_a Timeout + end + + it "will not run #on_error callbacks if the future succeeds" do + results = [] of Int32 + f = Future.new { 5 } + f.on_error { |r| + results << 1 + }.on_error { |r| + raise Exception.new + results << 2 + }.await.should eq 5 + + results.should be_empty +end + + it "supports function composition" do + c_1 = Future.new { rand ** 2 } + c_2 = c_1.map { |x| 1 - x } + + (c_1.await! + c_2.await!).should eq 1 + c_2.done?.should be_true + end + + it "doesn't propagate errors backward when composing functions" do + c_1 = Future.new { 2 } + c_2 = c_1.map { |x| raise CustomError.new("error"); 1 - x } + + c_2.await.should be_a CustomError + c_1.await.should eq 2 + end + + it "propagates errors when composing functions" do + c_1 = Future.new { raise CustomError.new("error"); rand ** 2 } + c_2 = c_1.map { |x| raise Exception.new("generic error"); 1 - x } + + c_2.await.should be_a CustomError + end + + it "can be filtered" do + f = Future.new { 5 } + g = f.select { |x| x % 2 == 1 } + h = f.select { |x| x % 2 == 0 } + g.await.should eq 5 + h.await.should be_a Concur::EmptyError + end + + it "supports combining two futures with #flat_map" do + f = Future.new { 5 } + g = Future.new { 3 } + + f.flat_map { |x| g.map { |y| x + y } } + .await.should eq 8 + end + + it "will return the first encountered error on #flat_map" do + f = Future.new { raise CustomError.new; 5 } + g = Future.new { 3 } + + f.flat_map { |x| g.map { |y| x + y } } + .await.should be_a CustomError + end + + it "can be flattened in case of nested futures" do + f = Future.new { Future.new { 5 } } + f.await.should be_a Future(Int32) + + f.flatten + .await.should eq 5 + end + + it "supports mapping its underlying value to a specific type" do + f = Future.new { rand < 0.5 ? "hello" : 5 } + + typeof(f.await).should eq String | Int32 | Exception + v = f.map_to(Int32).await + typeof(v).should eq Int32 | Exception + end + + it "raises a TypeCastError when mapping to an incompatible type" do + f = Future.new { rand < 0.5 ? "hello" : 5 } + g = f.map { |v| v.class == Int32 ? "hello 2" : 6 } + + expect_raises(TypeCastError) { + fv = f.map_to(Int32).await! + gv = g.map_to(Int32).await! + } + end + + it "can be recovered if it fails" do + f = Future.new { raise CustomError.new; 42 } + f.recover { |e| + case e + when CustomError + 100 + else + raise Exception.new("Unexpected failure") + end + }.await.should eq 100 + end + + it "does not recover successful futures" do + f = Future.new { 42 } + f.recover { |ex| 43 } + .await.should eq 42 + end + + it "can be transformed" do + f = Future.new { 42 } + g = Future.new { raise CustomError.new; 42 } + t = -> (res : Int32 | Exception) { + case res + when Int32 + "#{res + 1}" + else + "0" + end + } + f.transform(&t).await.should eq "43" + g.transform(&t).await.should eq "0" + end + + it "supports zipping futures together" do + f = Future.new { 1 } + g = Future.new { 2 } + f.zip(g) { |v1, v2| v1 + v2 } + .await.should eq 3 + end +end diff --git a/spec/readme_spec.cr b/spec/readme_spec.cr new file mode 100644 index 0000000..4067183 --- /dev/null +++ b/spec/readme_spec.cr @@ -0,0 +1,69 @@ +require "./spec_helper" + +describe "Future example" do + it "showcases #map, #select and #recover" do + f = Future.new { + sleep 2 # a placeholder for some expensive computation or for a lengthy IO operation + "Success!" + } + + f.map { |v| v.downcase } + .select { |v| v.size < 3 } + .recover { |ex| ex.message || ex.class.to_s } + .await.should eq "Concur::EmptyError" + end + + it "showcases #zip and #flat_map" do + db = DB.new + author_f : Future(User) = db.user_by_id(1) + reviewer_f : Future(User) = db.user_by_id(2) + + content_f = author_f.zip(reviewer_f) { |auth, rev| + rev.groups.any? { |g| auth.reviewer_groups.includes? g } + }.flat_map { |reviewer_is_allowed| + if reviewer_is_allowed + db.content_by_user(1) + else + raise Exception.new + end + } + + content_f.on_success { |content| + reviewer_f.await!.email(content) + }.on_error { |ex| log_error(ex) } + + content_f.await.should eq ({id: 1, content: ["some content"]}) + end +end + +class DB + @users = [ + User.new(1, [1, 3, 42] of Int32, [] of Int32), + User.new(2, [] of Int32, [42]) + ] + @content = [ + {id: 1, content: ["some content"]} + ] + + def user_by_id(n) + Future.new { + @users.find { |u| u.id == n }.not_nil! + } + end + + def content_by_user(n) + Future.new { + @content.find { |c| c[:id] == n }.not_nil! + } + end +end + +def log_error(ex) + puts "error #{ex.class}: #{ex.message}" +end + +record User, id : Int32, reviewer_groups : Array(Int32), groups : Array(Int32) do + def email(content) + # Send the user an email with the given `content` + end +end diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 7735a0f..456f143 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -1,5 +1,5 @@ require "spec" -require "../src/concur" +require "../src/**" extend Concur Spec.override_default_formatter(Spec::VerboseFormatter.new) @@ -10,4 +10,6 @@ end def take(stream, n) (1..n).map { stream.receive } -end \ No newline at end of file +end + +class CustomError < Exception; end diff --git a/src/future.cr b/src/future.cr new file mode 100644 index 0000000..dffa0b5 --- /dev/null +++ b/src/future.cr @@ -0,0 +1,210 @@ +module Concur + class EmptyError < Exception + end + + class Timeout < Exception + end + + # A Future represents a value which may or may not *currently* be available, but will be + # available at some point, or an exception if that value could not be made available. + class Future(T) + @value : T? = nil + @error : Exception? = nil + + def initialize(&block : -> T) + @value_ch = Channel(T).new(1) + @error_ch = Channel(Exception).new(1) + + spawn do + result = block.call + @value_ch.send result + rescue exception + @error_ch.send exception + end + end + + # Awaits the completion of the future and returns either a value or an exception. + def await : T | Exception + unless done? + select + when res = @value_ch.receive? + @value = res if res + when err = @error_ch.receive? + @error = err if err + end + @value_ch.close + @error_ch.close + end + @error ? @error.not_nil! : @value.not_nil! + end + + # Awaits the completion of the future and either returns the computed value + # or raises an exception. + def await! : T + case res = await + when T + res + else + raise res + end + end + + # Same as `#await`, but returns a `Timeout` exception if the given `t` expires. + def await(t : Time::Span) : T | Exception + unless done? + select + when timeout(t) + return Timeout.new + when res = @value_ch.receive? + @value = res if res + when err = @error_ch.receive? + @error = err if err + end + @value_ch.close + @error_ch.close + end + @error ? @error.not_nil! : @value.not_nil! + end + + # Same as `#await!`, but raises a `Timeout` exception if the given `t` expires. + def await!(t : Time::Span) : T + case res = await(t) + when T + res + else + raise res + end + end + + # Returns `true` if the future has completed - either with a value or an exception. + # Returns `false` otherwise. + def done? + @value_ch.closed? + end + + # Creates a new future by applying a function to the successful result of this future, + # and returns the result of the function as the new future. + # + # If this future is completed with an exception then the new future will also contain this exception. + def flat_map(&block : T -> Future(K)) : Future(K) forall K + map { |value| + block.call(value).await! + } + end + + # Creates a new future with one level of nesting flattened. + def flatten + flat_map { |fut| fut } + end + + # Creates a new future by applying a function to the successful result of this future. + # + # If this future is completed with an exception then the new future will also contain this exception. + def map(&block : T -> K) : Future(K) forall K + transform { |res| + case res + when T + block.call(res) + else + raise res + end + } + end + + # Creates a new Future which is completed with this Future's result if + # that conforms to type `typ` or a `TypeCastError` otherwise. + def map_to(typ : Object.class) + map { |value| typ.cast(value) } + end + + # Applies the side-effecting function to the result of this future, and returns a new future + # with the result of this future. + # + # This method allows one to enforce that the callbacks are executed in a specified order. + # Note: if one of the chained `on_complete` callbacks throws an exception, that exception is not + # propagated to the subsequent `on_complete` callbacks. Instead, the subsequent `on_complete` callbacks + # are given the original value of this future. + def on_complete(&block : (T | Exception) -> _) : Future(T) + transform { |res| + begin + block.call(res) + rescue + # no-op + end + case res + when T; res + else raise res + end + } + end + + # Applies the side-effecting function to the result of this future if it was successful, and + # returns a new future with the result of this future. + # + # WARNING: Will not be called if this future is never completed or if it is completed with an error. + def on_success(&block : T -> _) : Future(T) + on_complete { |res| + case res + when T + block.call(res) + end + } + end + + # Applies the side-effecting function to the result of this future if it raised an error, and + # returns a new future with the result of this future. + # + # WARNING: Will not be called if this future is never completed or if it is completed with success. + def on_error(&block : Exception -> _) : Future(T) + on_complete { |res| + case res + when Exception + block.call(res) + end + } + end + + # Creates a new future that will handle any matching throwable that this future might contain. + # If there is no match, or if this future contains a valid result then the new future will contain the same. + def recover(&block : Exception -> T) : Future(T) + transform { |res| + case res + when T + res + else + block.call(res) + end + } + end + + # Creates a new future by filtering the value of the current future with a predicate. + # + # If the current future contains a value which satisfies the predicate, the new future will also hold that value. + # Otherwise, the resulting future will fail with a `EmptyError`. + # If the current future fails, then the resulting future also fails. + def select(&block : T -> Bool) : Future(T) + map { |value| + if block.call(value) + value + else + raise EmptyError.new + end + } + end + + # Creates a new Future by applying the specified function to the result of this Future. + # + # If there is any non-fatal exception thrown when 'block' is applied then that exception + # will be propagated to the resulting future. + def transform(&block : (T | Exception) -> K) : Future(K) forall K + Future.new { + block.call(self.await) + } + end + + # Creates a new future holding the result of `block` applied to the tuple of values from two futures. + def zip(other : Future(K), &block : (T, K) -> W) : Future(W) forall K, W + flat_map { |t| other.map { |k| block.call(t, k) } } + end + end +end