Skip to content

Upserts with more selective update functionality #593

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
mmottl opened this issue Apr 8, 2025 · 3 comments
Open

Upserts with more selective update functionality #593

mmottl opened this issue Apr 8, 2025 · 3 comments

Comments

@mmottl
Copy link

mmottl commented Apr 8, 2025

The already available arrange_from_upsert function has the following signature:

pub fn arrange_from_upsert<G, K, V, Bu, Tr>(
    stream: &Stream<G, (K, Option<V>, G::Timestamp)>,
    name: &str,
) -> Arranged<G, TraceAgent<Tr>>
where ...

The stream of updates only allows for either deletion (None) of existing entries, or complete replacement of the value associated with a key with Some(new_value). But sometimes we want to update only part of an existing value.

We were thinking of a function with maybe the following signature:

pub fn arrange_from_update<G, K, V, U, F, Bu, Tr>(
    stream: &Stream<G, (K, U, G::Timestamp)>,
    name: &str,
    update_func: F,
) -> Arranged<G, TraceAgent<Tr>>
where
    U: ExchangeData, // Added U
    F: Fn(&K, Option<&V>, &U) -> Option<V> + Clone + 'static, // Added F
    ...

This adds two more generic types: U (update information) and F for an additional closure argument update_func that combines the key, an optional existing value, and the update information to generate an optional replacement.

Note that the current arrange_from_upsert function could be easily implemented on top of the above new function. U would be the same as V in that case, and update_func would return the optional replacement Some(new_value) if there were Some(old_value), or None otherwise.

There are at least two advantages offered by the new function:

  • Updating parts of existing values becomes possible or certainly much easier.
  • update_func can detect and act on the case where a key is missing (None being passed as argument), e.g. for emitting warnings if that indicates a problem.

It seems the suggested new function could be easily implemented reusing the existing code for arrange_from_upsert. Unless the compiler cannot optimize away the abstractions, we could then replace it with a suitable call to arrange_from_update.

Would it be possible to add this feature, or maybe comment on any potential issues with the proposal?

@frankmcsherry
Copy link
Member

It seems possible. It's perhaps related to this PR from a while back, that also attempts to deconstruct the arrangement forming operators in a way that allows more generality.

I think the main caveat at the moment is that upsert.rs is mostly demo code at the moment, and isn't being used/tested, and is likely to break if relied on. There is a bit of work underway reworking the arrangement batching and building traits, and I imagine that once they settle it will be easier to have a clear opinion about when to allow the introduction of opinions about type changes and logic and such.

But also, if this help: the intent with all of DD is that it is a la carte, and if you'd like a different upsert.rs it should be pretty painless to copy/paste it, make the changes you want, and go from there. DD tries to keep a relatively minimal surface area, except where it really needs to be the one to write the code, or the code is so universal that it makes sense for DD to provide the skeletal structure (e.g. the linked PR above might slot in below both upsert.rs and reduce.rs, and in that case it seems smart to have a reference implementation). tl;dr: you should be able to implement this in user code and see if/that it works!

@mmottl
Copy link
Author

mmottl commented Apr 9, 2025

Thanks for the feedback! I'd hope that we could rely on upsert.rs and would test it or the derived new operator carefully. In case we run into problems with that, what would be the suggested solution using existing, presumably better tested operators?

If I understand this correctly, this seems to require a cyclic dataflow, and I'm not sure what the most idiomatic solution would be. E.g. if I wanted to delete values by key from an arrangement, would I perform a semijoin on it with the collection of keys to be deleted, negate the result and then concat it in a loop with the collection used to form the arrangement? And also concat the negated keys back into the collection of deleted keys? This would seem a bit involved and likely not very efficient, but maybe there is a better way?

@frankmcsherry
Copy link
Member

If I understand this correctly, this seems to require a cyclic dataflow, ...

I think it doesn't. At least, the intent of the PR when written (or, my memory thereof; perhaps it is totally at odds with the PR) is that no cycles are required, on account of the "feedback" is key-local. The same way that previous iterates inform the reduce.rs operator, without needing cyclic feedback. The feedback comes in the form of revealing the per-key historical data within the operator, rather than cycling it around.

I may also have misunderstood what "this" refers to (answered as if "that linked Pr there").

I'd hope that we could rely on upsert.rs ...

Yup, all I mean to say is that the surface area there may be changing. For example, the trace batch building API is in flux at the moment, and will likely be changed in a way that accumulates more updates before feeding them to a builder, so that the builder has more of a chance to plan for their insertion (e.g. right sizing allocations, compression, etc). The details of upsert.rs are not a fixed point in the plan for forward evolution, and are the most likely things to be run over.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants