Skip to content

Commit 6ae220c

Browse files
committed
Add initial modbus plugin
1 parent 7ca1ebf commit 6ae220c

File tree

6 files changed

+146
-2
lines changed

6 files changed

+146
-2
lines changed

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ members = [
33
"crates/msr",
44
"crates/msr-core",
55
"crates/msr-legacy",
6-
"crates/msr-plugin"
6+
"crates/msr-plugin",
7+
"crates/msr-plugin-fieldbus-modbus"
78
]
89

910
[patch.crates-io]
1011
msr = { path = "crates/msr" }
1112
msr-core = { path = "crates/msr-core" }
1213
msr-legacy = { path = "crates/msr-legacy" }
1314
msr-plugin = { path = "crates/msr-plugin" }
15+
msr-plugin-fieldbus-modbus = { path = "crates/msr-plugin-fieldbus-modbus" }

crates/msr-core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ thiserror = "1"
1414
anyhow = { version = "1", optional = true }
1515
bs58 = { version = "0.4", optional = true }
1616
csv = { version = "1", optional = true }
17-
serde = { version = "1", optional = true }
17+
serde = { version = "1", optional = true, features = ["derive"] }
1818
serde_json = { version = "1", optional = true }
1919
uuid = { version = "0.8", optional = true }
2020

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[package]
2+
name = "msr-plugin-fieldbus-modbus"
3+
version = "0.0.0"
4+
edition = "2018"
5+
6+
[dependencies]
7+
log = "0.4"
8+
msr-core = { path = "../msr-core" }
9+
msr-plugin = { path = "../msr-plugin" }
10+
thiserror = "1"
11+
tokio = { version = "1", default_features = false, features = ["sync", "macros"] }
12+
tokio-modbus = "0.5"
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use std::net::SocketAddr;
2+
3+
#[derive(Debug)]
4+
pub enum Message {
5+
Connect(ConnCfg),
6+
Shutdown,
7+
}
8+
9+
#[derive(Debug)]
10+
pub enum ConnCfg {
11+
Tcp(SocketAddr),
12+
}
13+
14+
#[derive(Debug, Clone)]
15+
pub enum Event {
16+
ConnectionError(String),
17+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use crate::api::{ConnCfg, Event, Message};
2+
use msr_plugin::MessageLoop;
3+
use tokio::sync::{broadcast, mpsc};
4+
5+
#[derive(Default)]
6+
pub struct Context {
7+
conn: Connection,
8+
}
9+
10+
enum Connection {
11+
Disconnected,
12+
Connecting,
13+
Connected(tokio_modbus::client::Context),
14+
}
15+
16+
impl Default for Connection {
17+
fn default() -> Self {
18+
Self::Disconnected
19+
}
20+
}
21+
22+
pub fn create_message_loop(
23+
event_tx: broadcast::Sender<Event>,
24+
) -> (MessageLoop, mpsc::UnboundedSender<Message>) {
25+
let mut ctx = Context::default();
26+
let (message_tx, mut message_rx) = mpsc::unbounded_channel();
27+
let message_loop = async move {
28+
log::debug!("Entering modbus plugin message loop");
29+
loop {
30+
tokio::select! {
31+
next_msg = message_rx.recv() => {
32+
if let Some(msg) = next_msg {
33+
log::debug!("Received message: {:?}", msg);
34+
match msg {
35+
Message::Connect(cfg) => {
36+
match cfg {
37+
ConnCfg::Tcp(addr) => {
38+
ctx.conn = Connection::Connecting;
39+
// TODO send event
40+
match tokio_modbus::client::tcp::connect(addr).await {
41+
Ok(mb_ctx) => {
42+
ctx.conn = Connection::Connected(mb_ctx);
43+
// TODO send event
44+
}
45+
Err(err) => {
46+
let ev = Event::ConnectionError(err.to_string());
47+
if let Err(ev) = event_tx.send(ev) {
48+
log::debug!("No subscribers, dropping event: {:?}", ev);
49+
}
50+
}
51+
}
52+
}
53+
}
54+
}
55+
Message::Shutdown => {
56+
break;
57+
}
58+
}
59+
} else {
60+
log::debug!("All message senders have been dropped");
61+
break;
62+
}
63+
}
64+
}
65+
}
66+
log::debug!("Exiting modbus plugin message loop");
67+
};
68+
(Box::pin(message_loop), message_tx)
69+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use msr_plugin::MessageLoop;
2+
use thiserror::Error;
3+
use tokio::sync::{broadcast, mpsc};
4+
5+
mod api;
6+
mod internal;
7+
8+
pub use api::*;
9+
10+
pub struct Plugin {
11+
message_loop: MessageLoop,
12+
message_tx: mpsc::UnboundedSender<Message>,
13+
broadcast_tx: broadcast::Sender<Event>,
14+
}
15+
16+
#[derive(Debug, Error)]
17+
pub enum SetupError {}
18+
19+
impl Plugin {
20+
pub fn setup() -> Result<Self, SetupError> {
21+
let (broadcast_tx, _) = broadcast::channel(100);
22+
let event_tx = broadcast_tx.clone();
23+
let (message_loop, message_tx) = internal::create_message_loop(event_tx);
24+
Ok(Self {
25+
message_tx,
26+
message_loop,
27+
broadcast_tx,
28+
})
29+
}
30+
}
31+
32+
impl msr_plugin::Plugin for Plugin {
33+
type Message = Message;
34+
type Event = Event;
35+
fn message_sender(&self) -> mpsc::UnboundedSender<Self::Message> {
36+
self.message_tx.clone()
37+
}
38+
fn subscribe_events(&self) -> broadcast::Receiver<Self::Event> {
39+
self.broadcast_tx.subscribe()
40+
}
41+
fn run(self) -> MessageLoop {
42+
self.message_loop
43+
}
44+
}

0 commit comments

Comments
 (0)