Skip to content

Commit

Permalink
Bump to event-listener v3.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
notgull committed May 20, 2023
1 parent f738cfd commit 6cd1ef4
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 104 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ categories = ["asynchronous", "concurrency"]
exclude = ["/.*"]

[dependencies]
event-listener = "2.5.1"
event-listener = "2"
event-listener-strategy = { git = "https://github.com/smol-rs/event-listener.git" }

[dev-dependencies]
async-channel = "1.5.0"
Expand All @@ -24,3 +25,6 @@ futures-lite = "1.12.0"

[target.'cfg(any(target_arch = "wasm32", target_arch = "wasm64"))'.dev-dependencies]
wasm-bindgen-test = "0.3"

[patch.crates-io]
event-listener = { git = "https://github.com/smol-rs/event-listener.git" }
6 changes: 5 additions & 1 deletion src/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ enum WaitState {
Initial,

/// We are waiting for the listener to complete.
Waiting { evl: EventListener, local_gen: u64 },
Waiting {
/// TODO: At the next breaking release, remove the `Pin<Box<>>` and make this type `!Unpin`.
evl: Pin<Box<EventListener>>,
local_gen: u64,
},

/// Waiting to re-acquire the lock to check the state again.
Reacquiring(u64),
Expand Down
112 changes: 61 additions & 51 deletions src/mutex.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::borrow::Borrow;
use std::cell::UnsafeCell;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::process;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::Poll;

// Note: we cannot use `target_family = "wasm"` here because it requires Rust 1.54.
#[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))]
Expand All @@ -18,6 +17,7 @@ use std::time::{Duration, Instant};
use std::usize;

use event_listener::{Event, EventListener};
use event_listener_strategy::{easy_wrapper, EventListenerFuture};

/// An async mutex.
///
Expand Down Expand Up @@ -109,10 +109,10 @@ impl<T: ?Sized> Mutex<T> {
/// ```
#[inline]
pub fn lock(&self) -> Lock<'_, T> {
Lock {
Lock::_new(LockInner {
mutex: self,
acquire_slow: None,
}
})
}

/// Attempts to acquire the mutex.
Expand Down Expand Up @@ -184,7 +184,7 @@ impl<T: ?Sized> Mutex<T> {
/// ```
#[inline]
pub fn lock_arc(self: &Arc<Self>) -> LockArc<T> {
LockArc(LockArcInnards::Unpolled(self.clone()))
LockArc::_new(LockArcInnards::Unpolled(self.clone()))
}

/// Attempts to acquire the mutex and clone a reference to it.
Expand Down Expand Up @@ -246,107 +246,111 @@ impl<T: Default + ?Sized> Default for Mutex<T> {
}
}

/// The future returned by [`Mutex::lock`].
pub struct Lock<'a, T: ?Sized> {
easy_wrapper! {
/// The future returned by [`Mutex::lock`].
pub struct Lock<'a, T: ?Sized>(LockInner<'a, T> => MutexGuard<'a, T>);
pub(crate) wait();
}

struct LockInner<'a, T: ?Sized> {
/// Reference to the mutex.
mutex: &'a Mutex<T>,

/// The future that waits for the mutex to become available.
acquire_slow: Option<AcquireSlow<&'a Mutex<T>, T>>,
}

impl<'a, T: ?Sized> Unpin for Lock<'a, T> {}
impl<'a, T: ?Sized> Unpin for LockInner<'a, T> {}

impl<T: ?Sized> fmt::Debug for Lock<'_, T> {
impl<T: ?Sized> fmt::Debug for LockInner<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Lock { .. }")
}
}

impl<'a, T: ?Sized> Future for Lock<'a, T> {
impl<'a, T: ?Sized> EventListenerFuture for LockInner<'a, T> {
type Output = MutexGuard<'a, T>;

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

loop {
match this.acquire_slow.as_mut() {
fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>(
self: Pin<&'x mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
let LockInner {
mutex,
acquire_slow,
} = self.get_mut();

// This may seem weird, but the borrow checker complains otherwise.
if acquire_slow.is_none() {
match mutex.try_lock() {
Some(guard) => return Poll::Ready(guard),
None => {
// Try the fast path before trying to register slowly.
match this.mutex.try_lock() {
Some(guard) => return Poll::Ready(guard),
None => {
this.acquire_slow = Some(AcquireSlow::new(this.mutex));
}
}
}

Some(acquire_slow) => {
// Continue registering slowly.
let value = ready!(Pin::new(acquire_slow).poll(cx));
return Poll::Ready(MutexGuard(value));
*acquire_slow = Some(AcquireSlow::new(mutex));
}
}
}

ready!(Pin::new(acquire_slow.as_mut().unwrap()).poll_with_strategy(strategy, context));
Poll::Ready(MutexGuard(mutex))
}
}

/// The future returned by [`Mutex::lock_arc`].
pub struct LockArc<T: ?Sized>(LockArcInnards<T>);
easy_wrapper! {
/// The future returned by [`Mutex::lock_arc`].
pub struct LockArc<T: ?Sized>(LockArcInnards<T> => MutexGuardArc<T>);
pub(crate) wait();
}

enum LockArcInnards<T: ?Sized> {
/// We have not tried to poll the fast path yet.
Unpolled(Arc<Mutex<T>>),

/// We are acquiring the mutex through the slow path.
AcquireSlow(AcquireSlow<Arc<Mutex<T>>, T>),

/// Empty hole to make taking easier.
Empty,
}

impl<T: ?Sized> Unpin for LockArc<T> {}
impl<T: ?Sized> Unpin for LockArcInnards<T> {}

impl<T: ?Sized> fmt::Debug for LockArc<T> {
impl<T: ?Sized> fmt::Debug for LockArcInnards<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("LockArc { .. }")
}
}

impl<T: ?Sized> Future for LockArc<T> {
impl<T: ?Sized> EventListenerFuture for LockArcInnards<T> {
type Output = MutexGuardArc<T>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll_with_strategy<'a, S: event_listener_strategy::Strategy<'a>>(
self: Pin<&'a mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
let this = self.get_mut();

loop {
match mem::replace(&mut this.0, LockArcInnards::Empty) {
match this {
LockArcInnards::Unpolled(mutex) => {
// Try the fast path before trying to register slowly.
match mutex.try_lock_arc() {
Some(guard) => return Poll::Ready(guard),
None => {
*this = LockArc(LockArcInnards::AcquireSlow(AcquireSlow::new(
mutex.clone(),
)));
*this = LockArcInnards::AcquireSlow(AcquireSlow::new(mutex.clone()));
}
}
}

LockArcInnards::AcquireSlow(mut acquire_slow) => {
LockArcInnards::AcquireSlow(ref mut acquire_slow) => {
// Continue registering slowly.
let value = match Pin::new(&mut acquire_slow).poll(cx) {
let value = match Pin::new(acquire_slow).poll_with_strategy(strategy, context) {
Poll::Pending => {
*this = LockArc(LockArcInnards::AcquireSlow(acquire_slow));
return Poll::Pending;
}
Poll::Ready(value) => value,
};
return Poll::Ready(MutexGuardArc(value));
}

LockArcInnards::Empty => panic!("future polled after completion"),
}
}
}
Expand All @@ -358,7 +362,9 @@ struct AcquireSlow<B: Borrow<Mutex<T>>, T: ?Sized> {
mutex: Option<B>,

/// The event listener waiting on the mutex.
listener: Option<EventListener>,
///
/// TODO: At the next breaking release, remove the `Pin<Box<>>` and make this type `!Unpin`.
listener: Option<Pin<Box<EventListener>>>,

/// The point at which the mutex lock was started.
#[cfg(not(any(target_arch = "wasm32", target_os = "wasm64")))]
Expand Down Expand Up @@ -402,11 +408,15 @@ impl<T: ?Sized, B: Borrow<Mutex<T>>> AcquireSlow<B, T> {
}
}

impl<T: ?Sized, B: Unpin + Borrow<Mutex<T>>> Future for AcquireSlow<B, T> {
impl<T: ?Sized, B: Unpin + Borrow<Mutex<T>>> EventListenerFuture for AcquireSlow<B, T> {
type Output = B;

#[cold]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll_with_strategy<'a, S: event_listener_strategy::Strategy<'a>>(
mut self: Pin<&'a mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
let this = &mut *self;
#[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))]
let start = *this.start.get_or_insert_with(Instant::now);
Expand Down Expand Up @@ -443,7 +453,7 @@ impl<T: ?Sized, B: Unpin + Borrow<Mutex<T>>> Future for AcquireSlow<B, T> {
}
Some(ref mut listener) => {
// Wait for a notification.
ready!(Pin::new(listener).poll(cx));
ready!(strategy.poll(listener.as_mut(), context));
this.listener = None;

// Try locking if nobody is being starved.
Expand Down Expand Up @@ -515,7 +525,7 @@ impl<T: ?Sized, B: Unpin + Borrow<Mutex<T>>> Future for AcquireSlow<B, T> {
}
Some(ref mut listener) => {
// Wait for a notification.
ready!(Pin::new(listener).poll(cx));
ready!(strategy.poll(listener.as_mut(), context));
this.listener = None;

// Try acquiring the lock without waiting for others.
Expand Down
Loading

0 comments on commit 6cd1ef4

Please sign in to comment.