11package gears .async
22
3+ import language .experimental .captureChecking
4+
35import java .util .concurrent .CancellationException
46import java .util .concurrent .atomic .AtomicBoolean
57import scala .annotation .tailrec
6- import scala .annotation .unchecked .uncheckedCaptures
78import scala .annotation .unchecked .uncheckedVariance
89import scala .collection .mutable
910import scala .compiletime .uninitialized
1011import scala .util
1112import scala .util .control .NonFatal
1213import scala .util .{Failure , Success , Try }
13-
14- import language .experimental .captureChecking
14+ import gears .async .Async .SourceSymbol
1515
1616/** Futures are [[Async.Source Source ]]s that has the following properties:
1717 * - They represent a single value: Once resolved, [[Async.await await ]]-ing on a [[Future ]] should always return the
@@ -51,10 +51,11 @@ object Future:
5151 * - withResolver: Completion is done by external request set up from a block of code.
5252 */
5353 private class CoreFuture [+ T ] extends Future [T ]:
54+
5455 @ volatile protected var hasCompleted : Boolean = false
5556 protected var cancelRequest = AtomicBoolean (false )
5657 private var result : Try [T ] = uninitialized // guaranteed to be set if hasCompleted = true
57- private val waiting = mutable.Set [( Listener [Try [T ]]^ ) @ uncheckedCaptures] ()
58+ private val waiting : mutable.Set [Listener [Try [T ]]^ ] = mutable. Set ()
5859
5960 // Async.Source method implementations
6061
@@ -107,49 +108,11 @@ object Future:
107108
108109 end CoreFuture
109110
110- private class CancelSuspension [U ](val src : Async .Source [U ]^ )(val ac : Async , val suspension : ac.support.Suspension [Try [U ], Unit ]) extends Cancellable :
111- self : CancelSuspension [U ]^ {src, ac} =>
112- val listener : Listener [U ]^ {ac} = Listener .acceptingListener[U ]: (x, _) =>
113- val completedBefore = complete()
114- if ! completedBefore then
115- ac.support.resumeAsync(suspension)(Success (x))
116- unlink()
117- var completed = false
118-
119- def complete () = synchronized :
120- val completedBefore = completed
121- completed = true
122- completedBefore
123-
124- override def cancel () =
125- val completedBefore = complete()
126- if ! completedBefore then
127- src.dropListener(listener)
128- ac.support.resumeAsync(suspension)(Failure (new CancellationException ()))
129-
130- private class FutureAsync (val group : CompletionGroup )(using ac : Async , label : ac.support.Label [Unit ]) extends Async (using ac.support):
131- /** Await a source first by polling it, and, if that fails, by suspending in a onComplete call.
132- */
133- override def await [U ](src : Async .Source [U ]^ ): U =
134- if group.isCancelled then throw new CancellationException ()
135- src
136- .poll()
137- .getOrElse:
138- val res = ac.support.suspend[Try [U ], Unit ](k =>
139- val cancellable : CancelSuspension [U ]^ {src, ac} = CancelSuspension (src)(ac, k)
140- // val listener: Listener[U] = Listener.acceptingListener[U]: (x, _) => ???
141- // val completedBefore = cancellable.complete()
142- // if !completedBefore then ac.support.resumeAsync(k)(Success(x))
143- cancellable.link(group) // may resume + remove listener immediately
144- src.onComplete(cancellable.listener)
145- )(using label)
146- res.get
147-
148- override def withGroup (group : CompletionGroup ): Async = FutureAsync (group)
149-
150111 /** A future that is completed by evaluating `body` as a separate asynchronous operation in the given `scheduler`
151112 */
152113 private class RunnableFuture [+ T ](body : Async .Spawn ?=> T )(using ac : Async ) extends CoreFuture [T ]:
114+ private given acSupport : ac.support.type = ac.support
115+ private given acScheduler : ac.support.Scheduler = ac.scheduler
153116 /** RunnableFuture maintains its own inner [[CompletionGroup ]], that is separated from the provided Async
154117 * instance's. When the future is cancelled, we only cancel this CompletionGroup. This effectively means any
155118 * `.await` operations within the future is cancelled *only if they link into this group*. The future body run with
@@ -160,6 +123,47 @@ object Future:
160123 private def checkCancellation (): Unit =
161124 if cancelRequest.get() then throw new CancellationException ()
162125
126+ private class FutureAsync (val group : CompletionGroup )(using label : acSupport.Label [Unit ])
127+ extends Async (using acSupport, acScheduler):
128+ /** Await a source first by polling it, and, if that fails, by suspending in a onComplete call.
129+ */
130+ override def await [U ](src : Async .Source [U ]^ ): U =
131+ class CancelSuspension extends Cancellable :
132+ var suspension : acSupport.Suspension [Try [U ], Unit ] = uninitialized
133+ var listener : Listener [U ]^ {this } = uninitialized
134+ var completed = false
135+
136+ def complete () = synchronized :
137+ val completedBefore = completed
138+ completed = true
139+ completedBefore
140+
141+ override def cancel () =
142+ val completedBefore = complete()
143+ if ! completedBefore then
144+ src.dropListener(listener)
145+ acSupport.resumeAsync(suspension)(Failure (new CancellationException ()))
146+
147+ if group.isCancelled then throw new CancellationException ()
148+
149+ src
150+ .poll()
151+ .getOrElse:
152+ val cancellable = CancelSuspension ()
153+ val res = acSupport.suspend[Try [U ], Unit ](k =>
154+ val listener = Listener .acceptingListener[U ]: (x, _) =>
155+ val completedBefore = cancellable.complete()
156+ if ! completedBefore then acSupport.resumeAsync(k)(Success (x))
157+ cancellable.suspension = k
158+ cancellable.listener = listener
159+ cancellable.link(group) // may resume + remove listener immediately
160+ src.onComplete(listener)
161+ )
162+ cancellable.unlink()
163+ res.get
164+
165+ override def withGroup (group : CompletionGroup ): Async = FutureAsync (group)
166+
163167 override def cancel (): Unit = if setCancelled() then this .innerGroup.cancel()
164168
165169 link()
@@ -178,8 +182,9 @@ object Future:
178182 /** Create a future that asynchronously executes `body` that wraps its execution in a [[scala.util.Try ]]. The returned
179183 * future is linked to the given [[Async.Spawn ]] scope by default, i.e. it is cancelled when this scope ends.
180184 */
181- def apply [T ](body : Async .Spawn ?=> T )(using async : Async , spawnable : Async .Spawn )
182- (using async.type =:= spawnable.type ): Future [T ]^ {body, spawnable} =
185+ def apply [T ](body : Async .Spawn ?=> T )(using async : Async , spawnable : Async .Spawn )(
186+ using async.type =:= spawnable.type
187+ ): Future [T ]^ {body, spawnable} =
183188 RunnableFuture (body)(using spawnable)
184189
185190 /** A future that is immediately completed with the given result. */
@@ -197,11 +202,11 @@ object Future:
197202 /** A future that immediately rejects with the given exception. Similar to `Future.now(Failure(exception))`. */
198203 inline def rejected (exception : Throwable ): Future [Nothing ] = now(Failure (exception))
199204
200- extension [T ](f1 : Future [T ]^ )
205+ extension [T ](f1 : Future [T ])
201206 /** Parallel composition of two futures. If both futures succeed, succeed with their values in a pair. Otherwise,
202207 * fail with the failure that was returned first.
203208 */
204- def zip [U ](f2 : Future [U ]^ ): Future [(T , U )]^ {f1, f2} =
209+ def zip [U ](f2 : Future [U ]): Future [(T , U )] =
205210 Future .withResolver: r =>
206211 Async
207212 .either(f1, f2)
@@ -234,20 +239,20 @@ object Future:
234239 * @see
235240 * [[orWithCancel ]] for an alternative version where the slower future is cancelled.
236241 */
237- def or (f2 : Future [T ]^ ): Future [T ]^ {f1, f2} = orImpl(false )(f2)
242+ def or (f2 : Future [T ]): Future [T ] = orImpl(false )(f2)
238243
239244 /** Like `or` but the slower future is cancelled. If either task succeeds, succeed with the success that was
240245 * returned first and the other is cancelled. Otherwise, fail with the failure that was returned last.
241246 */
242- def orWithCancel (f2 : Future [T ]^ ): Future [T ]^ {f1, f2} = orImpl(true )(f2)
247+ def orWithCancel (f2 : Future [T ]): Future [T ] = orImpl(true )(f2)
243248
244- inline def orImpl (inline withCancel : Boolean )(f2 : Future [T ]^ ): Future [T ]^ {f1, f2} = Future .withResolver: r =>
249+ inline def orImpl (inline withCancel : Boolean )(f2 : Future [T ]): Future [T ] = Future .withResolver: r =>
245250 Async
246251 .raceWithOrigin(f1, f2)
247252 .onComplete(Listener { case ((v, which), _) =>
248253 v match
249254 case Success (value) =>
250- inline if withCancel then (if which == f1.symbol then f2 else f1).cancel()
255+ inline if withCancel then (if which == f1 then f2 else f1).cancel()
251256 r.resolve(value)
252257 case Failure (_) =>
253258 (if which == f1.symbol then f2 else f1).onComplete(Listener ((v, _) => r.complete(v)))
@@ -300,7 +305,7 @@ object Future:
300305 * may be used. The handler should eventually complete the Future using one of complete/resolve/reject*. The
301306 * default handler is set up to [[rejectAsCancelled ]] immediately.
302307 */
303- def onCancel (handler : () - > Unit ): Unit
308+ def onCancel (handler : () = > Unit ): Unit
304309 end Resolver
305310
306311 /** Create a promise that may be completed asynchronously using external means.
@@ -310,16 +315,16 @@ object Future:
310315 *
311316 * If the external operation supports cancellation, the body can register one handler using [[Resolver.onCancel ]].
312317 */
313- def withResolver [T ](body : Resolver [T ]^ => Unit ): Future [T ] =
314- val future = new CoreFuture [T ] with Resolver [T ] with Promise [T ] {
315- @ volatile var cancelHandle : (( ) -> Unit ) = () => rejectAsCancelled()
316- override def onCancel (handler : () - > Unit ): Unit = cancelHandle = handler
318+ def withResolver [T ](body : Resolver [T ] => Unit ): Future [T ] =
319+ val future = new CoreFuture [T ] with Resolver [T ] with Promise [T ]:
320+ @ volatile var cancelHandle : () -> Unit = () => rejectAsCancelled()
321+ override def onCancel (handler : () = > Unit ): Unit = cancelHandle = caps.unsafe.unsafeAssumePure( handler)
317322 override def complete (result : Try [T ]): Unit = super .complete(result)
318323
319324 override def cancel (): Unit =
320325 if setCancelled() then cancelHandle()
321- }
322- body(future)
326+ end future
327+ body(future : Resolver [ T ] )
323328 future
324329 end withResolver
325330
@@ -338,51 +343,46 @@ object Future:
338343 * [[Future.awaitAll ]] and [[Future.awaitFirst ]] for simple usage of the collectors to get all results or the first
339344 * succeeding one.
340345 */
341- class Collector [T ](val futures : (Future [T ]^ )* ):
346+ class Collector [T ](futures : (Future [T ]^ )* ):
342347 private val ch = UnboundedChannel [Future [T ]^ {futures* }]()
343348
344- private val futureRefs = mutable.Map [Async . SourceSymbol [Try [T ]], Future [T ]^ {futures* }]()
349+ private val futMap = mutable.Map [SourceSymbol [Try [T ]], Future [T ]^ {futures* }]()
345350
346351 /** Output channels of all finished futures. */
347352 final def results : ReadableChannel [Future [T ]^ {futures* }] = ch.asReadable
348353
349- private val listener = Listener ((_, futRef ) =>
354+ private val listener = Listener ((_, fut ) =>
350355 // safe, as we only attach this listener to Future[T]
351- val ref = futRef.asInstanceOf [Async .SourceSymbol [Try [T ]]]
352- val fut = futureRefs.synchronized :
353- // futureRefs.remove(ref).get
354- futureRefs(ref)
355- ch.sendImmediately(futureRefs(fut.symbol))
356+ val future = futMap.synchronized :
357+ futMap.remove(fut.asInstanceOf [SourceSymbol [Try [T ]]]).get
358+ ch.sendImmediately(future)
356359 )
357360
358361 protected final def addFuture (future : Future [T ]^ {futures* }) =
359- futureRefs.synchronized :
360- futureRefs += (future.symbol -> future)
362+ futMap.synchronized { futMap += (future.symbol -> future) }
361363 future.onComplete(listener)
362364
363365 futures.foreach(addFuture)
364366 end Collector
365367
366368 /** Like [[Collector ]], but exposes the ability to add futures after creation. */
367- class MutableCollector [T ](futures : ( Future [T ]^ ) * ) extends Collector [T ](futures* ):
369+ class MutableCollector [T ](futures : Future [T ]* ) extends Collector [T ](futures* ):
368370 /** Add a new [[Future ]] into the collector. */
369- def add (future : Future [T ]^ {futures * }) : Unit = addFuture(future)
370- def += (future : Future [T ]^ {futures * } ) = add(future)
371+ inline def add (future : Future [T ]^ ) = addFuture(future)
372+ inline def += (future : Future [T ]^ ) = add(future)
371373
372- extension [T ](@ caps.unbox fs : Seq [Future [T ]^ ])
374+ extension [T ](fs : Seq [Future [T ]])
373375 /** `.await` for all futures in the sequence, returns the results in a sequence, or throws if any futures fail. */
374376 def awaitAll (using Async ) =
375377 val collector = Collector (fs* )
376- for _ <- fs do
377- val fut : Future [T ]^ {fs* } = collector.results.read().right.get
378- fut.await
378+ for _ <- fs do collector.results.read().right.get.await
379379 fs.map(_.await)
380380
381381 /** Like [[awaitAll ]], but cancels all futures as soon as one of them fails. */
382382 def awaitAllOrCancel (using Async ) =
383- val collector = Collector [ T ] (fs* )
383+ val collector = Collector (fs* )
384384 try
385- for _ <- fs do ??? // collector.results.read().right.get.await
385+ for _ <- fs do collector.results.read().right.get.await
386386 fs.map(_.await)
387387 catch
388388 case NonFatal (e) =>
@@ -391,22 +391,20 @@ object Future:
391391
392392 /** Race all futures, returning the first successful value. Throws the last exception received, if everything fails.
393393 */
394- def awaitFirst (using Async ): T = impl. awaitFirstImpl[ T ](fs, false )
394+ def awaitFirst (using Async ): T = awaitFirstImpl( false )
395395
396396 /** Like [[awaitFirst ]], but cancels all other futures as soon as the first future succeeds. */
397- def awaitFirstWithCancel (using Async ): T = impl. awaitFirstImpl[ T ](fs, true )
397+ def awaitFirstWithCancel (using Async ): T = awaitFirstImpl( true )
398398
399- private object impl :
400- def awaitFirstImpl [T ](@ caps.unbox fs : Seq [Future [T ]^ ], withCancel : Boolean )(using Async ): T =
401- val collector = Collector [T ](fs* )
399+ private inline def awaitFirstImpl (withCancel : Boolean )(using Async ): T =
400+ val collector = Collector (fs* )
402401 @ scala.annotation.tailrec
403402 def loop (attempt : Int ): T =
404- val fut : Future [T ]^ {fs* } = collector.results.read().right.get
405- fut.awaitResult match
403+ collector.results.read().right.get.awaitResult match
406404 case Failure (exception) =>
407405 if attempt == fs.length then /* everything failed */ throw exception else loop(attempt + 1 )
408406 case Success (value) =>
409- if withCancel then fs.foreach(_.cancel())
407+ inline if withCancel then fs.foreach(_.cancel())
410408 value
411409 loop(1 )
412410end Future
@@ -432,11 +430,10 @@ class Task[+T](val body: (Async, AsyncOperations) ?=> T):
432430 def run ()(using Async , AsyncOperations ): T = body
433431
434432 /** Start a future computed from the `body` of this task */
435- def start ()(using async : Async , spawn : Async .Spawn , asyncOps : AsyncOperations )
436- (using async.type =:= spawn.type ): Future [T ]^ {this , spawn} =
433+ def start ()(using async : Async , spawn : Async .Spawn )(using asyncOps : AsyncOperations )(using async.type =:= spawn.type ): Future [T ]^ {body, spawn} =
437434 Future (body)(using async, spawn)
438435
439- def schedule (s : TaskSchedule ): Task [T ]^ {this } =
436+ def schedule (s : TaskSchedule ): Task [T ]^ {body } =
440437 s match {
441438 case TaskSchedule .Every (millis, maxRepetitions) =>
442439 assert(millis >= 1 )
0 commit comments