Skip to content

Add barriers to turmoil #229

@mcches

Description

@mcches
//! `Barriers` allow tests to granularly observe and control the execution of
//! source code by injecting observability and control hooks in to source code.
//! Barriers allow construction of complex tests which otherwise may rely on
//! timing conditions (which are difficult to write, flaky, and hard to
//! maintain) or monitoring and control of the network layer.
//!
//! Barriers are designed for Turmoil simulation tests. Integrations with
//! turmoil allow for test code to step the simulation until a barrier is
//! triggered.
//!
//! # Architecture
//! ┌──────────────┐                              ┌─────────────────────┐
//! │ Source Code  │                              │ Test Code           │
//! │              │                              │                     │
//! │  ┌─────────┐ │      ┌──────────────────┐    │                     │
//! │  │ Trigger ┼─┼─────►│   Barrier Repo   │◄───┼── Barrier::build()  │
//! │  └─────────┘ │      │  (Thread Local)  │    │                     │
//! │  ┌─────────┐ │  ┌───┼                  ├────┼─► Barrier::await    │
//! │  │ Resumer │◄┼──┘   └──────────────────┘    │                     │
//! │  └─────────┘ │                              │                     │
//! │              │                              │                     │
//! └──────────────┘                              └─────────────────────┘
//!
//! A barrier consists of two halves, a `Trigger` which defines the condition
//! a barrier is waiting for and a `Resumer` which controls when the src code
//! in a barrier is released. Interesting points of source code may be annotated
//! with Triggers; these triggers will no-op if test code is not interested and
//! are conditionally compiled out of non-test code. When test code creates a
//! barrier, the condition and resumer is registered in the barrier repo. Most
//! barriers are 'observe-only' and do not control execution (typically test
//! code is simply driving simulation forward until a Barrier is triggered).
//! However, test code may cause a future hitting a barrier to suspend until
//! the test code resumes it.
//!
//! Triggers are type safe Rust structs. Source code may define triggers as any
//! type desired. Barrier conditions are defined as match statements against a
//! trigger. Reactions are built as an enum of well defined actions; arbitrary
//! reaction code is not allowed to curtail insane usage.
//!
//! Note: Each trigger event wakes at most one barrier and processes in order
//! of registration. Avoid registering multiple barriers for the same triggers
//! to avoid confusion.

And some code as well!

use std::{any::Any, cell::RefCell, marker::PhantomData, ops::Deref};

use uuid::Uuid;
use tokio::sync::{
    mpsc::{self, UnboundedReceiver, UnboundedSender},
    oneshot,
};

thread_local! {
    static BARRIERS: BarrierRepo = BarrierRepo::new();
}

pub struct BarrierRepo {
    barriers: RefCell<Vec<BarrierState>>,
}

impl BarrierRepo {
    pub fn new() -> Self {
        Self {
            barriers: RefCell::new(vec![]),
        }
    }

    pub fn insert(&self, barrier: BarrierState) {
        self.barriers.borrow_mut().push(barrier);
    }

    pub fn drop(&self, id: Uuid) {
        self.barriers.borrow_mut().retain(|t| t.id != id);
    }

    fn barrier<T: Any + Send>(&self, t: &T) -> Option<(Reaction, mpsc::UnboundedSender<Waker>)> {
        let guard = self.barriers.borrow();
        for barrier in guard.iter() {
            if (barrier.condition)(t) {
                return Some((barrier.reaction.clone(), barrier.to_test.clone()));
            }
        }
        None
    }
}

pub async fn trigger<T: Any + Send>(t: T) {
    let Some((reaction, to_test)) = BARRIERS.with(|barriers| barriers.barrier(&t)) else {
        return;
    };

    let (tx, rx) = oneshot::channel();
    let waker = match reaction {
        Reaction::Noop => {
            tx.send(()).expect("Receiver is owned");
            None
        }
        Reaction::Suspend => Some(tx),
    };

    let _ = to_test.send((Box::new(t), waker));
    let _ = rx.await;
}

pub struct BarrierState {
    /// For dropping, we match equality of barriers based on this randomly
    /// generated id.
    id: Uuid,
    condition: Box<Condition>,
    reaction: Reaction,
    to_test: UnboundedSender<Waker>,
}

pub type Condition = dyn Fn(&dyn Any) -> bool;

pub struct Barrier<T> {
    id: Uuid,
    from_src: UnboundedReceiver<Waker>,
    _t: PhantomData<T>,
}

impl<T: Any + Send> Barrier<T> {
    /// Create a new barrier that matches the given `condition`
    pub fn new(condition: impl Fn(&T) -> bool + 'static) -> Self {
        Self::build(Reaction::Noop, condition)
    }

    pub fn build(reaction: Reaction, condition: impl Fn(&T) -> bool + 'static) -> Self {
        let condition = Box::new(move |t: &dyn Any| match t.downcast_ref::<T>() {
            Some(t) => condition(t),
            None => false,
        });

        let (tx, rx) = mpsc::unbounded_channel();
        let id = Uuid:: new_v4();
        let state = BarrierState {
            id,
            condition,
            reaction,
            to_test: tx,
        };
        BARRIERS.with(|barriers| barriers.insert(state));
        Self {
            id,
            from_src: rx,
            _t: PhantomData,
        }
    }

    pub async fn wait(&mut self) -> Option<Triggered<T>> {
        let (data, release) = self.from_src.recv().await?;
        let data = *data.downcast::<T>().unwrap();
        Some(Triggered { data, release })
    }
}

impl<T> Drop for Barrier<T> {
    fn drop(&mut self) {
        BARRIERS.with(|barriers| barriers.drop(self.id));
    }
}

pub struct Triggered<T> {
    data: T,
    release: Option<oneshot::Sender<()>>,
}

impl<T> Deref for Triggered<T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        &self.data
    }
}

impl<T> Drop for Triggered<T> {
    fn drop(&mut self) {
        if let Some(release) = self.release.take() {
            let _ = release.send(());
        }
    }
}

#[derive(Debug, Clone)]
pub enum Reaction {
    Noop,
    Suspend,
}

pub type Waker = (Box<dyn Any + Send>, Option<oneshot::Sender<()>>);

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions