Skip to content

Commit

Permalink
Fix: let Concur extend self
Browse files Browse the repository at this point in the history
  • Loading branch information
lbarasti committed Jan 19, 2022
1 parent cda6f55 commit c79157d
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 6 deletions.
5 changes: 2 additions & 3 deletions examples/parallel_processing.cr
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
require "../src/concur"
include Concur

# How many iterations does it take us to get 10 consecutive
# estimates of PI with small relative error?
# Reference: https://en.wikipedia.org/wiki/Approximations_of_%CF%80#Summing_a_circle's_area
pp source(Random.new) { |gen| {gen, {gen.rand, gen.rand}} }
pp Concur.source(Random.new) { |gen| {gen, {gen.rand, gen.rand}} }
.map(workers: 4) { |(x,y)| x**2 + y**2 }
.scan({0, 0}) { |acc, v|
v <= 1 ? {acc[0] + 1, acc[1]} : {acc[0], acc[1] + 1}
}.map { |(inner, outer)| 4 * inner / (inner + outer)}
.zip(source(1..)) { |estimate, iteration| {iteration, estimate} }
.zip(Concur.source(1..)) { |estimate, iteration| {iteration, estimate} }
.batch(10, 1.second)
.select { |estimates|
estimates.all? { |(i, e)|
Expand Down
2 changes: 1 addition & 1 deletion shard.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: concur
version: 1.0.0
version: 1.0.1

authors:
- lbarasti <[email protected]>
Expand Down
6 changes: 4 additions & 2 deletions src/concur.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require "rate_limiter"

module Concur
extend self

# Returns a channel that will receive each element in the given enumerable,
# and then close.
def source(input : Enumerable(T), name = nil, buffer_size = 0) : Channel(T) forall T
Expand Down Expand Up @@ -210,7 +212,7 @@ abstract class ::Channel(T)
) : Channel(V) forall V
enum_stream = map(workers: workers, name: name.try{ |s| "#{s}.map" },
on_error: on_error, &block)
flatten(enum_stream, name, buffer_size)
Concur.flatten(enum_stream, name, buffer_size)
end

# Returns a channel that receives values from `self` transformed via *block* and based
Expand Down Expand Up @@ -396,7 +398,7 @@ abstract class ::Channel(T)
def batch(size : Int32, interval : Time::Span, name = nil, buffer_size = 0) : Channel(Enumerable(T))
Channel(Enumerable(T)).new(buffer_size).tap { |stream|
memory = Array(T).new(size)
tick = every(interval) { nil }
tick = Concur.every(interval) { nil }
sent = false
spawn(name: name) do
loop do
Expand Down

0 comments on commit c79157d

Please sign in to comment.