-
Notifications
You must be signed in to change notification settings - Fork 557
Add Async#blockingCancelable
#3459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Async#blockingCancelable
#3459
Conversation
map( | ||
race( | ||
productR(gate.get)( // ensure cancelation fiber started before blocking | ||
blocking { | ||
inProgress.set(true) | ||
try thunk | ||
finally inProgress.set(false) | ||
}), | ||
onCancel( | ||
productR(gate.complete(()))(never[A]), | ||
ifM(delay(inProgress.get()))(cancel, unit) | ||
) | ||
))(_.merge) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For those reading the code, could you extract each of the two sides as a variable and add a 1-line comment on each, explaining what each side does, if you please?
map( | |
race( | |
productR(gate.get)( // ensure cancelation fiber started before blocking | |
blocking { | |
inProgress.set(true) | |
try thunk | |
finally inProgress.set(false) | |
}), | |
onCancel( | |
productR(gate.complete(()))(never[A]), | |
ifM(delay(inProgress.get()))(cancel, unit) | |
) | |
))(_.merge) | |
} | |
// LHS: Once cancellation fiber started, do blocking operation? | |
val lhs = | |
productR(gate.get)( // ensure cancelation fiber started before blocking | |
blocking { | |
inProgress.set(true) | |
try thunk | |
finally inProgress.set(false) | |
}) | |
// RHS: wait for a cancellation, if received cancel ?? | |
val rhs = | |
onCancel( | |
productR(gate.complete(()))(never[A]), | |
ifM(delay(inProgress.get()))(cancel, unit) | |
) | |
map(race(lhs, rhs), _.merge) | |
} |
I've thought about this case a couple times, but your PR brought it back to mind, and I have an alternative suggestion! def cancelable[A](fa: F[A], fin: F[Unit]): F[A] =
uncancelable { poll =>
start(fa) flatMap { fiber =>
poll(fa.join)
.onCancel(fin *> fiber.cancel)
.flatMap(_.embed(poll(F.canceled *> F.never))) // TODO gross
}
} This could actually go into def blockingCancelable[A](cancel: F[Unit])(thunk: => A): F[A] =
cancelable(blocking(thunk), cancel) I'm not super pleased with the self-cancelation handling above. Feels like there should be something better we can do there. But overall I think this is a bit more general. The idea here is that you can take an uncancelable effect (any uncancelable effect) and make it cancelable provided you have some way of making that happen. The semantics in the limit end up being the same. For example, canceling Poking @SystemFw because this seems fun from a calculus standpoint. |
Yes! Besides it is much nicer, thanks! 😁 |
@armanbilge Want me to open up a new PR? Also does this need to be 3.5.0? I'm reticent to scope-creep that API since we're already in RCs. |
No, it doesn't need to be. But then I'll need to inline it in FS2, and remember to remove it when this ships in v3.6.0. I'll live 😇 but complain all the way there!
Agreed. For some reason I thought that #3453 was going to go in 3.5.0 but now I realize that was never the plan at all? 😁 |
Well, that's the plan. :-) I think the |
Aha, so you are! I don't disagree with its importance, but that's a non-trivial change, that I'd say deserves an RC of its own. We already know it breaks some code I wrote like literally the day before you proposed it 😛 Suffice to say, relative to that, this PR is a nothing. |
@armanbilge Alrighty, I'll open a 3.5-targeting PR for |
This is a common idiom I am encountering with JDK I/O APIs: the operation is not interruptible, but it is possible to "cancel" it by closing the underlying resource. While this is not the "ideal" form of cancelation, it is much better than a completely uncancelable op that hangs your program (and indefinitely squats a thread). Besides, usually when you are canceling it you are about to close the resource anyway.
I've encoded this as an
Async#blockingCancelable
combinator. The logic is non-trivial, and requires concurrent state to ensure that the cancelation fiber is in place before starting the blocking op, and that cancelation happens only when in the blocking region. This is important since cancelation is presumed to be destructive (i.e. closing a resource), so it should be performed only conservatively.