Skip to content

Commit 90fe2c9

Browse files
committed
Implemented Message trait
1 parent 9db03d1 commit 90fe2c9

File tree

7 files changed

+125
-115
lines changed

7 files changed

+125
-115
lines changed

src/bastion/examples/ping/src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ impl Actor for Ping {
3939

4040
while (pong_struct.counter != 3) {
4141
let message = ctx.recv().await?;
42+
message.handle(ctx).await;
4243

43-
// Do something with the message ...
44-
45-
message.ack().await;
4644
self.send(message.path(), "pong", MessageType::Tell).await?;
4745
pong_struct.counter += 1;
46+
47+
message.ack().await;
4848
}
4949

5050
System::stop();

src/bastion/src/actor/context.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use std::sync::Arc;
22

3-
use async_channel::unbounded;
3+
use async_channel::{unbounded, Sender};
44

55
use crate::actor::local_state::LocalState;
66
use crate::actor::state::ActorState;
7-
use crate::mailbox::traits::TypedMessage;
7+
use crate::mailbox::envelope::Envelope;
8+
use crate::mailbox::message::Message;
89
use crate::mailbox::Mailbox;
910
use crate::routing::path::ActorPath;
1011

@@ -19,28 +20,28 @@ pub struct Context {
1920
/// Path to the actor in the system
2021
path: Arc<ActorPath>,
2122
/// Mailbox of the actor
22-
//mailbox: Mailbox<TypedMessage>,
23+
mailbox: Mailbox<Box<dyn Message>>,
2324
/// Local storage for actor's data
2425
local_state: LocalState,
2526
/// Current execution state of the actor
2627
internal_state: ActorState,
2728
}
2829

2930
impl Context {
30-
// FIXME: Pass the correct system_rx instead of the fake one
31-
pub(crate) fn new(path: ActorPath) -> Self {
32-
//let (_system_tx, system_rx) = unbounded();
33-
// let mailbox = Mailbox::new(system_rx);
31+
pub(crate) fn new(path: ActorPath) -> (Self, Sender<Envelope<impl Message>>) {
32+
let (system_tx, system_rx) = unbounded();
3433

3534
let path = Arc::new(path);
35+
let mailbox = Mailbox::new(system_rx);
3636
let local_state = LocalState::new();
3737
let internal_state = ActorState::new();
3838

39-
Context {
39+
let instance = Context {
4040
path,
41-
//mailbox,
41+
mailbox,
4242
local_state,
4343
internal_state,
44-
}
44+
};
45+
(instance, system_tx)
4546
}
4647
}
Lines changed: 69 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,34 @@
1+
use std::cell::RefCell;
12
use std::fmt::{self, Debug, Formatter};
23

34
use crate::actor::actor_ref::ActorRef;
4-
use crate::mailbox::message::MessageType;
5-
use crate::mailbox::traits::TypedMessage;
5+
use crate::mailbox::message::{Message, MessageType};
66

77
/// Struct that represents an incoming message in the actor's mailbox.
88
#[derive(Clone)]
99
pub struct Envelope<T>
1010
where
11-
T: TypedMessage,
11+
T: Message,
1212
{
1313
/// The sending side of a channel. In actor's world
1414
/// represented is a message sender. Can be used
1515
/// for acking message when it possible.
1616
sender: Option<ActorRef>,
1717
/// An actual data sent by the channel
18-
message: T,
18+
message: RefCell<Option<T>>,
1919
/// Message type that helps to figure out how to deliver message
2020
/// and how to ack it after the processing.
2121
message_type: MessageType,
2222
}
2323

2424
impl<T> Envelope<T>
2525
where
26-
T: TypedMessage,
26+
T: Message,
2727
{
2828
/// Create a message with the given sender and inner data.
29-
pub fn new(sender: Option<ActorRef>, message: T, message_type: MessageType) -> Self {
29+
pub fn new(sender: Option<ActorRef>, data: T, message_type: MessageType) -> Self {
30+
let message = RefCell::new(Some(data));
31+
3032
Envelope {
3133
sender,
3234
message,
@@ -40,10 +42,17 @@ where
4042
self.message_type.clone()
4143
}
4244

45+
/// Extracts the message data and returns it to the caller. Each further
46+
/// method call will return `None`.
47+
pub fn read(&self) -> Option<T> {
48+
self.message.replace(None)
49+
}
50+
51+
// TODO: Return a boolean flag once operation has finished?
4352
/// Sends a confirmation to the message sender.
44-
pub(crate) async fn ack(&self) {
53+
pub async fn ack(&self) {
4554
match self.message_type {
46-
MessageType::Ack => unimplemented!(),
55+
MessageType::Ask => unimplemented!(),
4756
MessageType::Broadcast => unimplemented!(),
4857
MessageType::Tell => unimplemented!(),
4958
}
@@ -52,7 +61,7 @@ where
5261

5362
impl<T> Debug for Envelope<T>
5463
where
55-
T: TypedMessage,
64+
T: Message,
5665
{
5766
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
5867
fmt.debug_struct("Message")
@@ -61,3 +70,54 @@ where
6170
.finish()
6271
}
6372
}
73+
74+
#[cfg(test)]
75+
mod message_target_tests {
76+
use crate::mailbox::envelope::Envelope;
77+
use crate::mailbox::message::{Message, MessageType};
78+
79+
#[derive(Debug)]
80+
struct FakeMessage;
81+
82+
impl FakeMessage {
83+
pub fn new() -> Self {
84+
return FakeMessage {};
85+
}
86+
}
87+
88+
#[test]
89+
fn test_message_read() {
90+
let message_data = FakeMessage::new();
91+
let instance = Envelope::new(None, message_data, MessageType::Tell);
92+
93+
let expected_data = instance.read();
94+
assert_eq!(expected_data.is_some(), true);
95+
96+
let another_read_attempt_data = instance.read();
97+
assert_eq!(another_read_attempt_data.is_none(), true);
98+
}
99+
100+
#[test]
101+
fn test_match_against_ask_message_type() {
102+
let message_data = FakeMessage::new();
103+
let instance = Envelope::new(None, message_data, MessageType::Ask);
104+
105+
assert_eq!(instance.message_type, MessageType::Ask);
106+
}
107+
108+
#[test]
109+
fn test_match_against_broadcast_message_type() {
110+
let message_data = FakeMessage::new();
111+
let instance = Envelope::new(None, message_data, MessageType::Broadcast);
112+
113+
assert_eq!(instance.message_type, MessageType::Broadcast);
114+
}
115+
116+
#[test]
117+
fn test_match_against_tell_message_type() {
118+
let message_data = FakeMessage::new();
119+
let instance = Envelope::new(None, message_data, MessageType::Tell);
120+
121+
assert_eq!(instance.message_type, MessageType::Tell);
122+
}
123+
}

src/bastion/src/mailbox/message.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,22 @@
1+
use std::fmt::Debug;
2+
3+
/// A trait that message needs to implement for typed actors (it
4+
/// forces message to implement the following traits: [`Any`],
5+
/// [`Send`] and [`Debug`]).
6+
///
7+
/// [`Any`]: https://doc.rust-lang.org/std/any/trait.Any.html
8+
/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
9+
/// [`Debug`]: https://doc.rust-lang.org/std/fmt/trait.Debug.html
10+
pub trait Message: Send + Debug + 'static {}
11+
impl<T> Message for T where T: ?Sized + Send + Debug + 'static {}
12+
113
/// Enum that provides information what type of the message
214
/// being sent through the channel.
315
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
416
pub enum MessageType {
517
/// A message type that requires sending a confirmation to the
618
/// sender after begin the processing stage.
7-
Ack,
19+
Ask,
820
/// A message that can be broadcasted (e.g. via system dispatchers). This
921
/// message type doesn't require to be acked from the receiver's side.
1022
Broadcast,

src/bastion/src/mailbox/mod.rs

Lines changed: 29 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1-
mod envelope;
2-
mod state;
3-
1+
pub mod envelope;
42
pub mod message;
5-
pub mod traits;
3+
pub(crate) mod state;
64

75
use std::sync::atomic::AtomicBool;
86
use std::sync::Arc;
@@ -11,14 +9,14 @@ use async_channel::{unbounded, Receiver, Sender};
119

1210
use crate::error::{BastionError, Result};
1311
use crate::mailbox::envelope::Envelope;
12+
use crate::mailbox::message::Message;
1413
use crate::mailbox::state::MailboxState;
15-
use crate::mailbox::traits::TypedMessage;
1614

1715
/// Struct that represents a message sender.
1816
#[derive(Clone)]
1917
pub struct MailboxTx<T>
2018
where
21-
T: TypedMessage,
19+
T: Message,
2220
{
2321
/// Indicated the transmitter part of the actor's channel
2422
/// which is using for passing messages.
@@ -30,7 +28,7 @@ where
3028

3129
impl<T> MailboxTx<T>
3230
where
33-
T: TypedMessage,
31+
T: Message,
3432
{
3533
/// Return a new instance of MailboxTx that indicates sender.
3634
pub(crate) fn new(tx: Sender<Envelope<T>>) -> Self {
@@ -57,110 +55,66 @@ where
5755
#[derive(Clone)]
5856
pub struct Mailbox<T>
5957
where
60-
T: TypedMessage,
58+
T: Message,
6159
{
62-
/// User guardian sender
63-
user_tx: MailboxTx<T>,
64-
/// User guardian receiver
65-
user_rx: Receiver<Envelope<T>>,
60+
/// Actor guardian sender
61+
actor_tx: MailboxTx<T>,
62+
/// Actor guardian receiver
63+
actor_rx: Receiver<Envelope<T>>,
6664
/// System guardian receiver
6765
system_rx: Receiver<Envelope<T>>,
68-
/// The current processing message, received from the
69-
/// latest call to the user's queue
70-
last_user_message: Option<Envelope<T>>,
71-
/// The current processing message, received from the
72-
/// latest call to the system's queue
73-
last_system_message: Option<Envelope<T>>,
7466
/// Mailbox state machine
7567
state: Arc<MailboxState>,
7668
}
7769

7870
// TODO: Add calls with recv with timeout
7971
impl<T> Mailbox<T>
8072
where
81-
T: TypedMessage,
73+
T: Message,
8274
{
8375
/// Creates a new mailbox for the actor.
8476
pub(crate) fn new(system_rx: Receiver<Envelope<T>>) -> Self {
85-
let (tx, user_rx) = unbounded();
86-
let user_tx = MailboxTx::new(tx);
87-
let last_user_message = None;
88-
let last_system_message = None;
77+
let (tx, actor_rx) = unbounded();
78+
let actor_tx = MailboxTx::new(tx);
8979
let state = Arc::new(MailboxState::new());
9080

9181
Mailbox {
92-
user_tx,
93-
user_rx,
82+
actor_tx,
83+
actor_rx,
9484
system_rx,
95-
last_user_message,
96-
last_system_message,
9785
state,
9886
}
9987
}
10088

101-
/// Forced receive message from user queue
89+
/// Forced receive message from the actor's queue.
10290
pub async fn recv(&mut self) -> Envelope<T> {
103-
let message = self
104-
.user_rx
91+
self.actor_rx
10592
.recv()
10693
.await
10794
.map_err(|e| BastionError::ChanRecv(e.to_string()))
108-
.unwrap();
109-
110-
self.last_user_message = Some(message);
111-
self.last_user_message.clone().unwrap()
95+
.unwrap()
11296
}
11397

114-
/// Try receiving message from user queue
98+
/// Try receiving message from the actor's queue.
11599
pub async fn try_recv(&mut self) -> Result<Envelope<T>> {
116-
if self.last_user_message.is_some() {
117-
return Err(BastionError::UnackedMessage);
118-
}
119-
120-
match self.user_rx.try_recv() {
121-
Ok(message) => {
122-
self.last_user_message = Some(message);
123-
Ok(self.last_user_message.clone().unwrap())
124-
}
125-
Err(e) => Err(BastionError::ChanRecv(e.to_string())),
126-
}
100+
self.actor_rx
101+
.try_recv()
102+
.map_err(|e| BastionError::ChanRecv(e.to_string()))
127103
}
128104

129-
/// Forced receive message from system queue
105+
/// Forced receive message from the internal system queue.
130106
pub async fn sys_recv(&mut self) -> Envelope<T> {
131-
let message = self
132-
.system_rx
107+
self.system_rx
133108
.recv()
134109
.await
135110
.map_err(|e| BastionError::ChanRecv(e.to_string()))
136-
.unwrap();
137-
138-
self.last_system_message = Some(message);
139-
self.last_system_message.clone().unwrap()
111+
.unwrap()
140112
}
141113

142-
/// Try receiving message from system queue
114+
/// Try receiving message from the internal system queue.
143115
pub async fn try_sys_recv(&mut self) -> Result<Envelope<T>> {
144-
if self.last_system_message.is_some() {
145-
return Err(BastionError::UnackedMessage);
146-
}
147-
148-
match self.system_rx.try_recv() {
149-
Ok(message) => {
150-
self.last_system_message = Some(message);
151-
Ok(self.last_system_message.clone().unwrap())
152-
}
153-
Err(e) => Err(BastionError::ChanRecv(e.to_string())),
154-
}
155-
}
156-
157-
/// Returns the last retrieved message from the user channel
158-
pub async fn get_last_user_message(&self) -> Option<Envelope<T>> {
159-
self.last_user_message.clone()
160-
}
161-
162-
/// Returns the last retrieved message from the system channel
163-
pub async fn get_last_system_message(&self) -> Option<Envelope<T>> {
164-
self.last_system_message.clone()
116+
self.system_rx
117+
.try_recv()
118+
.map_err(|e| BastionError::ChanRecv(e.to_string()))
165119
}
166120
}

0 commit comments

Comments
 (0)