Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/refactor wait handler #4

Open
wants to merge 40 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
4fa711c
feat: TryRecovery broadcast and replica impls
lfbrehm Oct 11, 2024
652e932
feat: Prepared Store for unknown event recovery
lfbrehm Oct 14, 2024
7e8bfa6
chore: Fix naming add docs for recovery
St4NNi Oct 14, 2024
c23310c
feat: Added/Update latest_time handling
St4NNi Oct 14, 2024
2460650
feat: Added wait_handler forget feature
St4NNi Oct 14, 2024
52b75ef
feat: Draft for new wait handler
St4NNi Oct 15, 2024
7e009df
feat: Waithandler refactor boilerplate traits for Store
lfbrehm Oct 15, 2024
71d44c5
feat: Refactor waithandler
St4NNi Oct 15, 2024
9da1ba8
feat: Updated waithandler to keep less state
St4NNi Oct 15, 2024
708a4f8
chore: Remove unwraps in wait_handler
St4NNi Oct 15, 2024
bbc9f53
fix: Empty deps in waithandler and already applied / commited ones
St4NNi Oct 16, 2024
87617d8
feat: started removing the reconfig buffer
St4NNi Oct 17, 2024
b546bdb
feat: Update type signature for SyneviResult / Transaction
St4NNi Oct 17, 2024
35d6396
refactor: Reconfiguration
St4NNi Oct 17, 2024
def9e81
refactor: Refactor status/members handling to network
St4NNi Oct 17, 2024
7059cb2
fix: Wait handler incorrect state
St4NNi Oct 17, 2024
d4eada5
chore: Debug reconfiguration
St4NNi Oct 17, 2024
b80b324
chore: Added debug for mismatched hashes
St4NNi Oct 17, 2024
fecbc5c
feat: Execution & Apply as one operation
lfbrehm Oct 18, 2024
2ba8e37
fix: Get or update transaction hashes
lfbrehm Oct 18, 2024
a43279e
fix: Fixed wait handler dependency handling
lfbrehm Oct 18, 2024
faa1703
chore: Removed dbg prints
lfbrehm Oct 18, 2024
e65f4a1
feat: Performance improvements
lfbrehm Oct 18, 2024
c9a2db9
feat: Dynamic recovery timeout
lfbrehm Oct 18, 2024
07036eb
test: Added redb_store for testing
St4NNi Oct 19, 2024
22a158d
chore: Rename stores
St4NNi Oct 21, 2024
d57bfb9
chore: coordinator put blocking IO ops into spawn blocking
St4NNi Oct 26, 2024
614066e
chore: replica put potential blocking tasks into spawn_blocking
St4NNi Oct 26, 2024
c28354a
chore: Remove unnecessary prints
St4NNi Oct 26, 2024
af96266
fix: Wrong path in tests
St4NNi Oct 26, 2024
2b4151f
chore: Update README.md add github actions workflows
St4NNi Oct 26, 2024
4001790
chore: Made tests smaller
St4NNi Oct 28, 2024
6741322
chore: Clippy fixes, fmt
St4NNi Oct 28, 2024
6e38a07
chore: Added retry backoff for add_member
St4NNi Oct 28, 2024
7fa169b
feat: Added id to execute function, added get_event_by_id function
St4NNi Oct 28, 2024
1a7a09e
chore: Re-export lmdb store in synevi
St4NNi Oct 30, 2024
1a46b33
fix: Fixed add_members to ignore self config
lfbrehm Nov 18, 2024
35078fe
feat: Added decoding function for events
St4NNi Nov 27, 2024
85e6caf
feat: Higher map size for lmdb
lfbrehm Feb 6, 2025
c22be6e
fix: Added missing read commits
lfbrehm Feb 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/actions-rs/grcov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
output-type: lcov
output-file: ./lcov.info
30 changes: 30 additions & 0 deletions .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
on:
push:
branches: [main]
pull_request:
name: coverage
env:
CARGO_NET_GIT_FETCH_WITH_CLI: true
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: arduino/setup-protoc@v1
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly
override: true
- uses: actions/checkout@v2
with:
submodules: recursive
- uses: actions-rs/cargo@v1
with:
command: test
args: --all-features --no-fail-fast -- --include-ignored --test-threads=1
env:
CARGO_INCREMENTAL: '0'
RUSTFLAGS: '-Zprofile -Ccodegen-units=1 -Cinline-threshold=0 -Clink-dead-code -Coverflow-checks=off -Cpanic=abort -Zpanic_abort_tests'
RUSTDOCFLAGS: '-Zprofile -Ccodegen-units=1 -Cinline-threshold=0 -Clink-dead-code -Coverflow-checks=off -Cpanic=abort -Zpanic_abort_tests'
- uses: actions-rs/[email protected]
- name: Upload to codecov.io
uses: codecov/codecov-action@v3
36 changes: 36 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
on:
push:
branches: [main, dev]
pull_request:
name: lint
env:
CARGO_NET_GIT_FETCH_WITH_CLI: true
jobs:
style:
runs-on: ubuntu-latest
steps:
- uses: arduino/setup-protoc@v1
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
components: rustfmt, clippy
- uses: actions/checkout@v2
with:
submodules: recursive
- name: cargo fmt --check
uses: actions-rs/cargo@v1
with:
command: fmt
args: --check
- name: cargo doc
uses: actions-rs/cargo@v1
if: always()
with:
command: doc
args: --no-deps --all-features
- name: cargo clippy
uses: actions-rs/clippy-check@v1
if: always()
with:
token: ${{ secrets.GITHUB_TOKEN }}
24 changes: 24 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
on:
- push
- pull_request
name: test
jobs:
test:
runs-on: ubuntu-latest
name: test
env:
CARGO_NET_GIT_FETCH_WITH_CLI: true
steps:
- uses: arduino/setup-protoc@v1
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
- uses: actions/checkout@v2
with:
submodules: recursive
- name: cargo test
uses: actions-rs/cargo@v1
with:
command: test
args: --all-features -- --include-ignored
72 changes: 69 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
[![Rust](https://img.shields.io/badge/built_with-Rust-dca282.svg)](https://www.rust-lang.org/)
[![License](https://img.shields.io/badge/License-Apache_2.0-brightgreen.svg)](https://github.com/ArunaStorage/synevi/blob/main/LICENSE-APACHE)
[![License](https://img.shields.io/badge/License-MIT-brightgreen.svg)](https://github.com/ArunaStorage/synevi/blob/main/LICENSE-MIT)
![CI](https://github.com/ArunaStorage/synevi/actions/workflows/test.yml/badge.svg)
[![Codecov](https://codecov.io/github/ArunaStorage/synevi/coverage.svg?branch=main)](https://codecov.io/gh/ArunaStorage/synevi)

# Synevi
Synevi (greek: συνέβει - "it happened") is a leaderless, strict serializable, embeddable event store for event sourcing.

Expand All @@ -16,11 +22,71 @@ For a more detailed explanation of the underlying architecture please visit our

### Embedded

TBD
## Getting Started with Synevi

Synevi provides a distributed event storage system that ensures consistent event ordering across your network. You can get started with either gRPC networking (the default) and your choice of persistent LMDB storage or in-memory storage to match your needs.

At its core, Synevi works by distributing events through coordinator nodes. When an event is sent to any coordinator, Synevi ensures all nodes execute it in the same order, maintaining consistency across your system. To work with Synevi, you'll need to implement two key traits for your events.

First, define your events by implementing the `Transaction` trait:

```rust
pub trait Transaction: Debug + Clone + Send {
type TxErr: Send + Serialize;
type TxOk: Send + Serialize;

fn as_bytes(&self) -> Vec<u8>;
fn from_bytes(bytes: Vec<u8>) -> Result<Self, SyneviError>
where
Self: Sized;
}
```

Then, implement the `Executor` trait to handle how your store processes these events:

```rust
#[async_trait::async_trait]
pub trait Executor: Send + Sync + 'static {
type Tx: Transaction + Serialize;

async fn execute(&self, transaction: Self::Tx) -> SyneviResult<Self>;
}
```

Here's a complete example that shows how to set up a Synevi node:

```rust
// Set up the network address
let socket_addr = SocketAddr::from_str(&format!("0.0.0.0:{}", 13000 + i)).unwrap();
let network = synevi::network::GrpcNetwork::new(
socket_addr,
format!("http://0.0.0.0:{}", 13000 + i),
Ulid::new(),
0u16,
);

// Choose your storage implementation
// For persistent storage using LMDB:
let test_path = format!("/dev/shm/test/");
fs::create_dir(&test_path).await.unwrap();
let store = LmdbStore::new(test_path, 0u16).unwrap();

// Or for in-memory storage:
// let store = MemStore::new(0u16).unwrap();

// Create your node using your executor (or the DummyExecutor that does nothing)
let node = Node::new(m, i as u16, network, DummyExecutor, store)
.await
.unwrap();
```

Synevi will automatically handle event distribution and ensure proper execution order across all functioning nodes in your network. The system is designed to be resilient, continuing to operate even if some nodes become unavailable. Whether you choose LMDB for persistence or in-memory storage for speed, the same consistency guarantees apply - all events are executed in the same order across every node in your network.

For production deployments, make sure your `Transaction` implementations include proper error handling and that your `Executor` implementation is thread-safe. Consider your network topology when setting up nodes, as it can impact system performance and reliability.

### As standalone Application
Note: Currently each node can handle only one type of executor, this can make it complicated when having many different types of events, for this you can use the excellent [typetag](https://github.com/dtolnay/typetag) crate that enables serialization of many types that implement shared behavior.

TBD
### As standalone Application (Coming soon)

## Feedback & Contributions

Expand Down
27 changes: 3 additions & 24 deletions benches/performance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,7 @@ async fn parallel_execution(coordinator: Arc<Node<GrpcNetwork, DummyExecutor, Me
});
}
while let Some(res) = joinset.join_next().await {
match res.unwrap().unwrap() {
synevi_types::types::ExecutorResult::External(res) => {
res.unwrap();
}
synevi_types::types::ExecutorResult::Internal(res) => {
res.unwrap();
}
}
res.unwrap().unwrap().unwrap();
}
}

Expand All @@ -82,14 +75,7 @@ async fn contention_execution(coordinators: Vec<Arc<Node<GrpcNetwork, DummyExecu
}
}
while let Some(res) = joinset.join_next().await {
match res.unwrap().unwrap() {
synevi_types::types::ExecutorResult::External(res) => {
res.unwrap();
}
synevi_types::types::ExecutorResult::Internal(res) => {
res.unwrap();
}
}
res.unwrap().unwrap().unwrap();
}
}

Expand All @@ -105,14 +91,7 @@ async fn _bigger_payloads_execution(
joinset.spawn(async move { coordinator.transaction(i, payload).await });
}
while let Some(res) = joinset.join_next().await {
match res.unwrap().unwrap() {
synevi_types::types::ExecutorResult::External(res) => {
res.unwrap();
}
synevi_types::types::ExecutorResult::Internal(res) => {
res.unwrap();
}
};
res.unwrap().unwrap().unwrap();
}
}

Expand Down
131 changes: 0 additions & 131 deletions docs/reconfiguration.md
Original file line number Diff line number Diff line change
@@ -1,134 +1,3 @@
# Reconfiguration Protocol

Each node has an epoch u32.

# Initialisation (0 Nodes)

The first node starts with `epoch = 0`. It adds itself to the config (address etc.). And initializes with serial 0;

# Join cluster as new node

TODO: Electorate Networkinterface pre-accept
Electorate @ pre-accept = Set of all members from current epoch
If pre-accept returns a new epoch... Fetch configuration.
Accept to all members of old epoch and all members of new epoch

In our impl:
When the coordinator receives info about a new epoch in PA responses.
-> Fetch config of epoch from appropriate node
Await fetch && PA to all (old + new)

1. Update Config request (UpdateConfigRequest) on node N1
2. Wait for the current epoch to be ready ? (ReadyEpoch = e)
3. If node was part of e-1

--------------------------------

### Onboarding revision 2

ArunaCtx {
1. New node starts NewNodeCtx
2. New node requests config + all_applieds from ExistingNodeCtx
3. Existing node 1 sends special transaction (NewConfig + Epoch) to other nodes with new config
}

NewNodeCtx {
1. Init ReplicaBuffer and collect Commits/Applies
2. Apply all_applieds
3. Apply/Commit ReplicaBuffer
4. Send JoinElectorate && Init FullReplica
}

ExistingNodeCtx {
1. Receive UpdateConfig
2. Send all applieds
4. Update Config
5. Send NewConfig to all
}

ElectorateCtx {
1. Rcv & Apply NewConfig + Epoch
2. Broadcast to updated config
3. Keep old electorate until RcvJoinElectorate
}

OldElectorateCtx {
1. if new epoch start ArunaCtx ?
}



--------------------------------

### Onboarding Phase 1

- New node gets created -> Send Join request to an existing node
- Wait for the current epoch (e) to be ready
- Responds with current config (e)
- Existing node creates a transaction (to the old electorate) with the new epoch and asynchronously starts the reconfiguration protocol.
- Replicas notice the new epoch and initialize the reconfiguration protocol after fetching from node

X | | | X
New epoch +1 (ts)

- Reconfiguration protocol:

- Wait for the current epoch to be ready
- If node was part of e-1 (previous epoch)
- Wait until the ReadyElectorate is true
- Send join electorate to all new nodes
- If node is a "new" one (Is part of e+1 but not e)
- ReadyElectorate = false
- else: ReadyElectorate = true

- Increase epoch e = e+1
-








-------
## Old ideas
0. Accept all consensus requests and put them to a buffer.

1. Ask all node for last applied + hash + current configuration
2. Ask all other nodes if max last applied + hash is correct ?
3. If yes OK else ask another node 1.
4. Download from any node that responded OK all data from your last_applied to the last_applied from 1. (Check if your last applied is correct ?!)

If your node was existing:

1. No consensus requests received --> Check if you are

Existing nodes can check if they are already a member of the current epoch and/or their state is correct (yet outdated).
- If state is correct: Recover the latest states and go ahead
- You are still member of an epoch and will receive Consensus requests. -> Go to buffer
- The first Apply Consensus you will receive is the last that needs to be synced out of consensus.
- Sync !
- If state is incorrect: Create a new epoch that replaces the old node with a new one

### Onboarding Phase 2

Fetch all applied of node-x based on my last applied + last applied hash;

Node sends a `FetchConfiguration` request to a running node (possibly the initial one). I receives the current member configuration and the current epoch e.

Reponse: Current config from the node (possible outdated)

Node sends a `JoinCluster` request to all current members with e == e+1 and its own configuration and optionally its last applied t0.

Response:
Majority(Old) -> OK (Last applied / Hash)
-> Replica start


NoMajority because someone else already got majority for this epoch.


# Re-join cluster as existing node


30 changes: 30 additions & 0 deletions docs/recovery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Recovery

Transactions are recovered after a specific time has passed.

If the node has data about the transaction (is either the coordinator or has witnessed a previous request).
Send recover request to all replicas. This has the function as an explicit pre-accept request and all replicas answer with their current state of the transaction (Tx).

## Cases

- No other has witnessed Tx -> Recover functions as Pre-Accept, continue with Accept
- If others have only witnessed a Pre-Accept:
- Do superseding transactions exist ? -> Continue with Accept
- Are there "waiting" transactions that should execute before ? -> Wait and restart recovery
- Otherwise continue with Accept
- For Accept: Enforce the fast-path if the original request got a majority (original T0 as T) otherwise use max(T)
- Accepted / Commited / Applied -> Continue with last known state

## Synevi specific implementation details

Unlike the original Accord implementation, synevi only collects T0s of dependencies.
This was done to mitigate the effects of larger transaction bodys, which would otherwise result in drastically increased message sizes.

One downside of this is that nodes can witness the existence of transactions via dependencies without the actual transaction body.
If it is necessary to recover that transaction to satisfy the dependencies, this creates a problem of "unknown recoveries".

To fix this: The waiting node can request a recovery from other nodes. These nodes will respond with a status information if they have witnessed the Tx and can recover it. As long as one node has the ability to recover the Tx and has witnessed the request the Tx will be recovered.

If a majority responds with an unable to recover status and the rest do not respond, the dependency can be removed:

The reason for this is that because the original transaction has not yet received a majority, it cannot have taken the fast path and must be recovered using a slow path. This also implies that a node that responds with a false state cannot allow the transaction to take the fast path.
Empty file added docs/waithandler_refactor.md
Empty file.
Loading
Loading