Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ jobs:
sdk
security
server
shard
test
web
consensus
Expand Down
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ members = [
"core/partitions",
"core/sdk",
"core/server",
"core/shard",
"core/simulator",
"core/tools",
"examples/rust",
Expand Down
1 change: 1 addition & 0 deletions DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ server: 0.7.0, "Apache-2.0",
sha1: 0.10.6, "Apache-2.0 OR MIT",
sha2: 0.10.9, "Apache-2.0 OR MIT",
sha3: 0.10.8, "Apache-2.0 OR MIT",
shard: 0.1.0, "N/A",
sharded-slab: 0.1.7, "MIT",
shlex: 1.3.0, "Apache-2.0 OR MIT",
signal-hook-registry: 1.4.8, "Apache-2.0 OR MIT",
Expand Down
29 changes: 29 additions & 0 deletions core/shard/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "shard"
version = "0.1.0"
edition = "2024"

[dependencies]
consensus = { path = "../consensus" }
iggy_common = { path = "../common" }
journal = { path = "../journal" }
message_bus = { path = "../message_bus" }
metadata = { path = "../metadata" }
partitions = { path = "../partitions" }
152 changes: 152 additions & 0 deletions core/shard/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use consensus::{MuxPlane, NamespacedPipeline, Plane, PlaneIdentity, VsrConsensus};
use iggy_common::header::{GenericHeader, PrepareHeader, PrepareOkHeader, RequestHeader};
use iggy_common::message::{Message, MessageBag};
use iggy_common::sharding::IggyNamespace;
use iggy_common::variadic;
use journal::{Journal, JournalHandle};
use message_bus::MessageBus;
use metadata::IggyMetadata;
use metadata::stm::StateMachine;
use partitions::IggyPartitions;

// variadic!(Metadata, Partitions) = (Metadata, (Partitions, ()))
type PlaneInner<B, J, S, M> = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could import the variadic! macro in there and use it ?

IggyMetadata<VsrConsensus<B>, J, S, M>,
(IggyPartitions<VsrConsensus<B, NamespacedPipeline>>, ()),
);

pub type ShardPlane<B, J, S, M> = MuxPlane<PlaneInner<B, J, S, M>>;

pub struct IggyShard<B, J, S, M>
where
B: MessageBus,
{
pub id: u8,
pub name: String,
pub plane: ShardPlane<B, J, S, M>,
}

impl<B, J, S, M> IggyShard<B, J, S, M>
where
B: MessageBus,
{
/// Create a new shard from pre-built metadata and partition planes.
pub fn new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets pass in already constructed consensus, metadata instances in the constructor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've realized my mistake and refactored the code. I've pushed the refactor, please take a look now.

id: u8,
name: String,
metadata: IggyMetadata<VsrConsensus<B>, J, S, M>,
partitions: IggyPartitions<VsrConsensus<B, NamespacedPipeline>>,
) -> Self {
let plane = MuxPlane::new(variadic!(metadata, partitions));
Self { id, name, plane }
}

/// Dispatch an incoming network message to the appropriate consensus plane.
///
/// Routes requests, replication messages, and acks to either the metadata
/// plane or the partitions plane based on `PlaneIdentity::is_applicable`.
pub async fn on_message(&self, message: Message<GenericHeader>)
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
J: JournalHandle,
<J as JournalHandle>::Target: Journal<
<J as JournalHandle>::Storage,
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
M: StateMachine<Input = Message<PrepareHeader>>,
{
match MessageBag::from(message) {
MessageBag::Request(request) => self.on_request(request).await,
MessageBag::Prepare(prepare) => self.on_replicate(prepare).await,
MessageBag::PrepareOk(prepare_ok) => self.on_ack(prepare_ok).await,
}
}

pub async fn on_request(&self, request: Message<RequestHeader>)
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
J: JournalHandle,
<J as JournalHandle>::Target: Journal<
<J as JournalHandle>::Storage,
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
M: StateMachine<Input = Message<PrepareHeader>>,
{
let planes = self.plane.inner();
if planes.0.is_applicable(&request) {
planes.0.on_request(request).await;
} else {
planes.1.0.on_request(request).await;
}
}

pub async fn on_replicate(&self, prepare: Message<PrepareHeader>)
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
J: JournalHandle,
<J as JournalHandle>::Target: Journal<
<J as JournalHandle>::Storage,
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
M: StateMachine<Input = Message<PrepareHeader>>,
{
let planes = self.plane.inner();
if planes.0.is_applicable(&prepare) {
planes.0.on_replicate(prepare).await;
} else {
planes.1.0.on_replicate(prepare).await;
}
}

pub async fn on_ack(&self, prepare_ok: Message<PrepareOkHeader>)
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
J: JournalHandle,
<J as JournalHandle>::Target: Journal<
<J as JournalHandle>::Storage,
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
M: StateMachine<Input = Message<PrepareHeader>>,
{
let planes = self.plane.inner();
if planes.0.is_applicable(&prepare_ok) {
planes.0.on_ack(prepare_ok).await;
} else {
planes.1.0.on_ack(prepare_ok).await;
}
}

pub fn init_partition(&mut self, namespace: IggyNamespace)
where
B: MessageBus<
Replica = u8,
Data = iggy_common::message::Message<iggy_common::header::GenericHeader>,
Client = u128,
>,
{
let partitions = &mut self.plane.inner_mut().1.0;
partitions.init_partition_in_memory(namespace);
partitions.register_namespace_in_pipeline(namespace.inner());
}
}
1 change: 1 addition & 0 deletions core/simulator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ journal = { path = "../journal" }
message_bus = { path = "../message_bus" }
metadata = { path = "../metadata" }
partitions = { path = "../partitions" }
shard = { path = "../shard" }
19 changes: 2 additions & 17 deletions core/simulator/src/deps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use crate::bus::SharedMemBus;
use bytes::Bytes;
use consensus::{
MuxPlane, {NamespacedPipeline, VsrConsensus},
};
use iggy_common::header::PrepareHeader;
use iggy_common::message::Message;
use iggy_common::variadic;
use journal::{Journal, JournalHandle, Storage};
use metadata::MuxStateMachine;
use metadata::stm::consumer_group::ConsumerGroups;
use metadata::stm::stream::Streams;
use metadata::stm::user::Users;
use metadata::{IggyMetadata, MuxStateMachine};
use std::cell::{Cell, RefCell, UnsafeCell};
use std::collections::HashMap;

Expand Down Expand Up @@ -151,16 +147,5 @@ impl JournalHandle for SimJournal<MemStorage> {
#[derive(Debug, Default)]
pub struct SimSnapshot {}

/// Type aliases for simulator metadata
/// Type alias for simulator state machine
pub type SimMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, ConsumerGroups)>;
pub type SimMetadata = IggyMetadata<
VsrConsensus<SharedMemBus>,
SimJournal<MemStorage>,
SimSnapshot,
SimMuxStateMachine,
>;

/// Type alias for simulator partitions
pub type ReplicaPartitions =
partitions::IggyPartitions<VsrConsensus<SharedMemBus, NamespacedPipeline>>;
pub type SimPlane = MuxPlane<variadic!(SimMetadata, ReplicaPartitions)>;
42 changes: 11 additions & 31 deletions core/simulator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ pub mod deps;
pub mod replica;

use bus::MemBus;
use consensus::{Plane, PlaneIdentity};
use iggy_common::header::{GenericHeader, ReplyHeader};
use iggy_common::message::{Message, MessageBag};
use iggy_common::header::ReplyHeader;
use iggy_common::message::Message;
use message_bus::MessageBus;
use replica::Replica;
use replica::{Replica, new_replica};
use std::sync::Arc;

pub struct Simulator {
Expand Down Expand Up @@ -54,7 +53,7 @@ impl Simulator {
let message_bus = Arc::new(message_bus);
let replicas = (0..replica_count)
.map(|i| {
Replica::new(
new_replica(
i as u8,
format!("replica-{}", i),
Arc::clone(&message_bus),
Expand All @@ -77,7 +76,7 @@ impl Simulator {
let message_bus = Arc::new(message_bus);
let replicas = (0..replica_count)
.map(|i| {
Replica::new(
new_replica(
i as u8,
format!("replica-{}", i),
Arc::clone(&message_bus),
Expand Down Expand Up @@ -114,31 +113,12 @@ impl Simulator {
None
}

async fn dispatch_to_replica(&self, replica: &Replica, message: Message<GenericHeader>) {
let planes = replica.plane.inner();
match MessageBag::from(message) {
MessageBag::Request(request) => {
if planes.0.is_applicable(&request) {
planes.0.on_request(request).await;
} else {
planes.1.0.on_request(request).await;
}
}
MessageBag::Prepare(prepare) => {
if planes.0.is_applicable(&prepare) {
planes.0.on_replicate(prepare).await;
} else {
planes.1.0.on_replicate(prepare).await;
}
}
MessageBag::PrepareOk(prepare_ok) => {
if planes.0.is_applicable(&prepare_ok) {
planes.0.on_ack(prepare_ok).await;
} else {
planes.1.0.on_ack(prepare_ok).await;
}
}
}
async fn dispatch_to_replica(
&self,
replica: &Replica,
message: Message<iggy_common::header::GenericHeader>,
) {
replica.on_message(message).await;
}
}

Expand Down
Loading
Loading