Skip to content

Trace import in an iterative context appears to create downgrade loop #598

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
oli-w opened this issue Apr 21, 2025 · 3 comments
Open

Trace import in an iterative context appears to create downgrade loop #598

oli-w opened this issue Apr 21, 2025 · 3 comments

Comments

@oli-w
Copy link

oli-w commented Apr 21, 2025

Hi,

I'm a bit stuck trying to use a shared trace in an iterative scope. I'm trying to emulate a database transaction, where you have some existing data, updates in a transaction, and if these updates pass some validation they should become part of the "existing data". I have run into an issue where the trace import operator seems to get stuck in a downgrade loop.

To create a minimal repro, I have a program where I have a set of pre-existing values (simple string values "a", "b", "c"), and I want to be able to submit updates to these values. I submit 2 transactions:

  • Transaction 1 adds "c" - invalid because "c" already exists
  • Transaction 2 adds "d" - valid

The transaction updates enter an iterative context, semi-join with existing committed values to find "violations", and then set a validated_tx_var variable with the valid transaction ID's (2 in this case). The "committed" values are defined as a values input collection, plus updates semi-joined with validated_tx_var.

If I turn the committed values into a trace and then use that to find violations, it appears to create a downgrade loop. I added a no-op inspect_frontier operator that prints out the input frontier (without holding onto any capabilities) and it prints out (0,1), (0,2), ... (0,n) etc. The code inside TraceAgent::import_core appears to be repeatedly calling capabilities.downgrade(&frontier.borrow()[..]); with TraceReplayInstruction::Frontier as the matched instruction.

Here is the code: https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=26347c68458ecf8c175f45cf77a313c6. You can toggle the bool on line 29 to see broken vs expected behaviour. Hopefully I've just missed something in the rules for using traces :)

@frankmcsherry
Copy link
Member

frankmcsherry commented Apr 21, 2025

I'll take a peek in the coming 1-2 weeks, if that's ok (I have some train time next week). If it's at all helpful, one of the things I'll be doing is writing a (non-interactive) transaction processor in recursive SQL, and it might connect here. One thing is that I couldn't write it as one dataflow, and instead have a combination of "transaction commit/rollback determination", and then a separate effectuator (table writes).

If it helps at all, the logic (from .. one year ago; oof blog backlog) is:

-- Transactions intents.
-- Each transaction consists of multiple read values and written values.
-- If all read values of the transaction hold, the written values can be committed.
-- If any read values of the transaction do not hold, all written values should not be committed.
CREATE TABLE intents (id INT, is_read BOOL, k TEXT, v TEXT);

-- We will develop a set of identifiers that should be rolled back, initially empty, and then assess
-- all transactions read sets against the prior writes that were not rolled back.
-- By iteration `i` we will have locked down at least the first `i` transaction ids, which ensures that
-- we certainly terminate.
WITH MUTUALLY RECURSIVE 

    -- Develop an initially empty set of identifiers to roll back.
    -- Only roll back transactions with a read that does not find its value.
    -- This uses `reads`, which is not yet defined.
    rollback(id INT) AS (
        SELECT reads.id
        FROM reads
        WHERE reads.v IS DISTINCT FROM reads.r
    ),

    -- Tentative writes as a function of `rollback`.
    writes(id INT, k TEXT, v TEXT) AS (
        SELECT intents.id, k, v
        FROM intents
        WHERE intents.id NOT IN (SELECT * FROM rollback)
            AND NOT intents.is_read 
    ),

    -- Reads, and the corresponding read value.
    -- These may not match, which `rollback` will react to.
    reads(id INT, v TEXT, r TEXT) AS (
        SELECT id, v, (
            SELECT DISTINCT ON (k) v 
            FROM writes 
            WHERE writes.id < intents.id
            AND writes.k = intents.k
            ORDER BY k, writes.id DESC
        )
        FROM intents
        WHERE intents.is_read
    )

SELECT * FROM writes;

@frankmcsherry
Copy link
Member

Reposting the linked code, in case it vanishes by then (no clue how long playground links live):

use differential_dataflow::operators::arrange::{ArrangeBySelf, TraceAgent};
use differential_dataflow::operators::iterate::Variable;
use differential_dataflow::operators::{Join, Threshold};
use differential_dataflow::trace::implementations::ord_neu::OrdKeySpine;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::{AsCollection, Collection, Data};
use std::fmt::Debug;
use timely::communication::allocator::Generic;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::unordered_input::UnorderedHandle;
use timely::dataflow::operators::{ActivateCapability, Operator, UnorderedInput};
use timely::dataflow::{ProbeHandle, Scope, Stream};
use timely::order::Product;
use timely::progress::frontier::AntichainRef;
use timely::progress::{Antichain, Timestamp};
use timely::worker::Worker;

type Time = u32;
type IterTime = Product<Time, u32>;
type TxId = u32;
type Diff = isize;

type Value = String;

type StreamEntry<D> = (D, Time, Diff);
type ValueTrace = TraceAgent<OrdKeySpine<Value, IterTime, Diff>>;

// Toggle to see broken/expected behaviour
const BROKEN: bool = true;

pub fn traces_hang() {
    timely::execute_from_args(std::env::args(), |worker| {
        let (inputs, entity_value_trace, probe) = worker.dataflow::<Time, _, _>(|scope| {
            let probe = ProbeHandle::new();
            let (values_input_cap, values) = {
                let (values_input_cap, values_stream) = scope.new_unordered_input::<StreamEntry<Value>>();
                (values_input_cap, values_stream.as_collection())
            };
            let (updates_input_cap, updates) = {
                let (updates_input_cap, updates_stream) = scope.new_unordered_input::<StreamEntry<(TxId, Value)>>();
                (updates_input_cap, updates_stream.as_collection())
            };

            let value_trace = scope.iterative::<u32, _, _>(|scope| {
                let values = values.enter(scope);
                let updates = updates.enter(scope);
                let validated_tx_var = Variable::new(scope, Product::new(0, 1));

                let new_updates = updates.antijoin(&validated_tx_var).print("new_updates");
                let validated_updates = updates.semijoin(&validated_tx_var).print("validated_updates");

                let committed_values = values
                    .concat(&validated_updates.map(|(_, data)| data))
                    .consolidate()
                    .print("committed_values");
                let mut value_arranged = committed_values.arrange_by_self();
                let committed_values = {
                    if BROKEN {
                        value_arranged
                            .trace
                            .import(scope)
                            .as_collection(|entity_value, _| entity_value.clone())
                    } else {
                        committed_values
                    }
                };
                // Simple uniqueness check - find violations by semi-joining new updates with existing values
                let invalid_tx = new_updates
                    .map(|(tx, value)| (value, tx))
                    .semijoin(&committed_values)
                    .map(|(_value, tx)| tx)
                    .distinct()
                    .print("invalid_tx");
                let valid_tx = updates
                    .map(|(tx, _)| (tx, ()))
                    .distinct()
                    .antijoin(&invalid_tx)
                    .map(|(tx, _)| tx)
                    .print("valid_tx");

                let validated_tx_next = valid_tx.consolidate().print("validated_tx_next");
                inspect_frontier(&validated_tx_next.inner);
                validated_tx_var.set(&validated_tx_next).leave().probe_with(&probe);

                value_arranged.trace
            });

            (
                Inputs {
                    values: InputCap(values_input_cap.0, values_input_cap.1),
                    updates: InputCap(updates_input_cap.0, updates_input_cap.1),
                },
                value_trace,
                probe,
            )
        });

        let mut controller = DataflowController::new(worker.clone(), inputs, probe, entity_value_trace);
        // Populate initial set of values
        {
            let insert_time = *controller.time();
            let values = &mut controller.inputs.values;
            values.update("a".to_string(), insert_time, 1);
            values.update("b".to_string(), insert_time, 1);
            values.update("c".to_string(), insert_time, 1);
        }
        controller.progress_to_next_time();

        // Insert 2 updates at transaction 1 and 2 respectively
        {
            let insert_time = *controller.time();
            let updates = &mut controller.inputs.updates;
            // tx 1 is invalid because "c" already exists
            // tx 2 is valid, so we see a +1 output for "d" in committed_values
            updates.update((1, "c".to_string()), insert_time, 1);
            updates.update((2, "d".to_string()), insert_time, 1);
        }
        controller.progress_to_next_time();
    })
    .expect("Computation terminated abnormally");
}

struct Inputs {
    values: InputCap<Value>,
    updates: InputCap<(TxId, Value)>,
}
impl Inputs {
    fn downgrade(&mut self, time: &Time) {
        self.values.downgrade(time);
        self.updates.downgrade(time);
    }
}
struct DataflowController {
    worker: Worker<Generic>,
    time: Time,
    inputs: Inputs,
    probe: ProbeHandle<Time>,
    value_trace: ValueTrace,
}

impl DataflowController {
    fn new(worker: Worker<Generic>, inputs: Inputs, probe: ProbeHandle<Time>, shared_traces: ValueTrace) -> Self {
        Self {
            worker,
            time: Time::minimum(),
            inputs,
            probe,
            value_trace: shared_traces,
        }
    }
    fn progress_to_next_time(&mut self) {
        self.time += 1;

        // Downgrade inputs and the value trace
        self.inputs.downgrade(&self.time);
        let frontier = Antichain::from_elem(Product::new(self.time, 0));
        self.value_trace.set_logical_compaction(frontier.borrow());
        self.value_trace.set_physical_compaction(AntichainRef::new(&[]));

        let mut steps = 0;
        while self.probe.less_than(&self.time) {
            self.worker.step();
            steps += 1;
            if steps > 200 {
                panic!("Infinite loop");
            }
        }
        println!(
            "========= Advanced to time {:?} in {} steps =========\n",
            self.time, steps
        )
    }
    fn time(&self) -> &Time {
        &self.time
    }
}

struct InputCap<D: Data>(UnorderedHandle<Time, (D, Time, Diff)>, ActivateCapability<Time>);
impl<D: Data> InputCap<D> {
    fn update(&mut self, value: D, time: Time, diff: Diff) {
        let mut input_session = self.0.session(self.1.clone());
        input_session.give((value, time, diff));
    }
    fn downgrade(&mut self, timestamp: &Time) {
        self.1.downgrade(timestamp);
    }
}

pub trait PrintCollection {
    fn print(&self, prefix: &str) -> Self;
}
impl<G: Scope, D: timely::Data + Clone + Debug> PrintCollection for Collection<G, D> {
    fn print(&self, prefix: &str) -> Self {
        self.inspect({
            let prefix = prefix.to_string();
            move |(data, time, diff)| {
                let diff = pad_right(
                    if diff > &0 {
                        format!(" {diff}")
                    } else {
                        diff.to_string()
                    },
                    5,
                );
                let time = pad_right(format!("{:?}", time), 30);
                let prefix = pad_right(prefix.clone(), 30);

                println!("{diff} at {time} {prefix} {:?}", data);
            }
        })
    }
}

fn pad_right(mut value: String, length: usize) -> String {
    while value.len() < length {
        value.push(' ');
    }
    value
}

fn inspect_frontier<G, D>(input_stream: &Stream<G, D>) -> Stream<G, D>
where
    G: Scope,
    D: Data,
{
    input_stream.unary_frontier(Pipeline, "inspect_frontier", |_initial_capability, _info| {
        move |input, output| {
            println!("inspect_frontier called at {:?}", input.frontier().frontier());
            input.for_each(|cap, data| {
                for datum in data.drain(..) {
                    println!("Forwarding datum {:?} at {:?}", datum, cap.time());
                    let mut session = output.session(&cap);
                    session.give(datum);
                }
            });
        }
    })
}

@oli-w
Copy link
Author

oli-w commented Apr 21, 2025

in the coming 1-2 weeks

That would be great, thank you.

Thanks for sharing your thoughts too. I had read your blog Dataflow as Database and was looking to do something similar / continue extending the ideas. The additional things I'm trying to add:

  • Let reads be any query.
  • Let you install "constraints" as dataflow queries, where constraints output violation rows and no output = valid. For example, I could create a constraint SELECT * FROM foo WHERE foo.bar = "" to disallow blank values of column bar, but of course this can be something much more powerful.
  • "Interactive" transactions. Use special timestamps to separate between data that is committed (visible to all), pending (only visible to the current transaction) and validating (visible between transactions that are trying to commit).

My timestamp looks like this:

type TxId = u32;
type TxStep = u32;
struct TxTimestamp {
    system: u64,
    region: TimestampRegion,
}
enum TimestampRegion {
    Committed,
    Pending { tx: TxId, step: TxStep },
    Validating { tx: TxId, step: TxStep },
    Never,
}

The Lattice::join and ::meet methods are quite interesting - joining between (Pending, Validating) or 2 Pending times for a different TxId result in the Never (AKA "top") time, which I have to regularly filter out after various dataflow operators. 2 Validating times do interact. The TxStep allows for a pending transaction to do some writes at step 1, then add a query at step 2 (which can see that pending write), and so on. In the Validating region, tx is used to determine which transaction "wins" if there are conflicts, where the larger tx will be aborted (which I believe corresponds to the WHERE writes.id < intents.id clause in your reads).

The logic to determine if a read is valid requires ensuring that there is at most 1 distinct system time reported for any query output during Pending. Then when a transaction is promoted to Validating I check that the query emits the exact same output as it did in Pending (not 100% sure yet if this is completely correct, still exploring).

When a transaction wants to commit, it sends the transaction ID into a commit_requested input, and is then promoted to successive timestamp regions if all valid, via a Variable. One tricky thing required is mapping times... sideways. For example, the dataflow might report that tx 1 is valid at time Pending { tx: 1 }, so then I have to re-inject 1 at (Validating, iteration + 1), an incomparable time. Then again if a transaction is found to be valid in the Validating time region, it has to be mapped to (Committed, iteration + 1), also an incomparable time. I was writing a custom timely operator to do this (carefully hanging onto the initial capability for just long enough) and thought that might be the source of the problem, but managed to isolate it to the use of the trace.

I had managed to get this working nicely, except the one thing where state might have to be handled "outside" the dataflow is for tracking query read invalidations. Specifically, if an ad-hoc query is installed after the initial setup, if that detects a read invalidation I have to manually feed that back into the original transaction dataflow.

Keen to hear if I'm missing anything and this approach has some fundamental flaw. Subscribed for when that blog eventually comes out :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants