Skip to content

Commit 440cb90

Browse files
committed
enter acknowledgement
1 parent 1295ef3 commit 440cb90

File tree

3 files changed

+163
-66
lines changed

3 files changed

+163
-66
lines changed

src/capture.rs

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{
22
cell::Cell,
3+
future,
34
time::{Duration, Instant},
45
};
56

@@ -66,9 +67,9 @@ impl Capture {
6667
.expect("channel closed");
6768
}
6869

69-
async fn run(server: Server, mut rx: Receiver<CaptureRequest>, conn: LanMouseConnection) {
70+
async fn run(server: Server, mut rx: Receiver<CaptureRequest>, mut conn: LanMouseConnection) {
7071
loop {
71-
if let Err(e) = do_capture(&server, &conn, &mut rx).await {
72+
if let Err(e) = do_capture(&server, &mut conn, &mut rx).await {
7273
log::warn!("input capture exited: {e}");
7374
}
7475
server.set_capture_status(Status::Disabled);
@@ -88,7 +89,7 @@ impl Capture {
8889

8990
async fn do_capture(
9091
server: &Server,
91-
conn: &LanMouseConnection,
92+
conn: &mut LanMouseConnection,
9293
rx: &mut Receiver<CaptureRequest>,
9394
) -> Result<(), InputCaptureError> {
9495
let backend = server.config.capture_backend.map(|b| b.into());
@@ -111,16 +112,33 @@ async fn do_capture(
111112
capture.create(handle, to_capture_pos(pos)).await?;
112113
}
113114

115+
let mut state = State::Idle;
116+
114117
loop {
115118
tokio::select! {
116119
event = capture.next() => match event {
117-
Some(event) => handle_capture_event(server, &mut capture, conn, event?).await?,
120+
Some(event) => handle_capture_event(server, &mut capture, conn, event?, &mut state).await?,
118121
None => return Ok(()),
119122
},
123+
(handle, event) = conn.recv() => if let Some(active) = server.get_active() {
124+
if handle != active {
125+
// we only care about events coming from the client we are currently connected to
126+
// only `Ack` and `Leave` are relevant
127+
continue
128+
}
129+
130+
match event {
131+
// connection acknowlegded => set state to Sending
132+
ProtoEvent::Ack(_) => state = State::Sending,
133+
// client disconnected
134+
ProtoEvent::Leave(_) => release_capture(&mut capture, server, &mut state).await?,
135+
_ => {}
136+
}
137+
},
120138
e = rx.recv() => {
121139
match e {
122140
Some(e) => match e {
123-
CaptureRequest::Release => capture.release().await?,
141+
CaptureRequest::Release => release_capture(&mut capture, server, &mut state).await?,
124142
CaptureRequest::Create(h, p) => capture.create(h, p).await?,
125143
CaptureRequest::Destroy(h) => capture.destroy(h).await?,
126144
},
@@ -156,28 +174,41 @@ macro_rules! debounce {
156174
};
157175
}
158176

177+
enum State {
178+
Idle,
179+
WaitingForAck,
180+
Sending,
181+
}
182+
159183
async fn handle_capture_event(
160184
server: &Server,
161185
capture: &mut InputCapture,
162186
conn: &LanMouseConnection,
163187
event: (CaptureHandle, CaptureEvent),
188+
state: &mut State,
164189
) -> Result<(), CaptureError> {
165190
let (handle, event) = event;
166191
log::trace!("({handle}): {event:?}");
167192

168193
if server.should_release.borrow_mut().take().is_some()
169194
|| capture.keys_pressed(&server.release_bind)
170195
{
171-
return capture.release().await;
196+
return release_capture(capture, server, state).await;
172197
}
173198

174199
if event == CaptureEvent::Begin {
200+
*state = State::WaitingForAck;
201+
server.set_active(Some(handle));
175202
spawn_hook_command(server, handle);
176203
}
177204

178205
let event = match event {
179206
CaptureEvent::Begin => ProtoEvent::Enter(lan_mouse_proto::Position::Left),
180-
CaptureEvent::Input(e) => ProtoEvent::Input(e),
207+
CaptureEvent::Input(e) => match state {
208+
State::Sending => ProtoEvent::Input(e),
209+
// connection not acknowledged, repeat `Enter` event
210+
_ => ProtoEvent::Enter(lan_mouse_proto::Position::Left),
211+
},
181212
};
182213

183214
if let Err(e) = conn.send(event, handle).await {
@@ -188,6 +219,16 @@ async fn handle_capture_event(
188219
Ok(())
189220
}
190221

222+
async fn release_capture(
223+
capture: &mut InputCapture,
224+
server: &Server,
225+
state: &mut State,
226+
) -> Result<(), CaptureError> {
227+
*state = State::Idle;
228+
server.set_active(None);
229+
capture.release().await
230+
}
231+
191232
fn to_capture_pos(pos: lan_mouse_ipc::Position) -> input_capture::Position {
192233
match pos {
193234
lan_mouse_ipc::Position::Left => input_capture::Position::Left,

src/connect.rs

Lines changed: 106 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::server::Server;
22
use lan_mouse_ipc::{ClientHandle, DEFAULT_PORT};
33
use lan_mouse_proto::{ProtoEvent, MAX_EVENT_SIZE};
4+
use local_channel::mpsc::{channel, Receiver, Sender};
45
use std::{
56
collections::{HashMap, HashSet},
67
io,
@@ -67,17 +68,26 @@ pub(crate) struct LanMouseConnection {
6768
server: Server,
6869
conns: Rc<Mutex<HashMap<SocketAddr, Arc<dyn Conn + Send + Sync>>>>,
6970
connecting: Rc<Mutex<HashSet<ClientHandle>>>,
71+
recv_rx: Receiver<(ClientHandle, ProtoEvent)>,
72+
recv_tx: Sender<(ClientHandle, ProtoEvent)>,
7073
}
7174

7275
impl LanMouseConnection {
7376
pub(crate) fn new(server: Server) -> Self {
77+
let (recv_tx, recv_rx) = channel();
7478
Self {
7579
server,
7680
conns: Default::default(),
7781
connecting: Default::default(),
82+
recv_rx,
83+
recv_tx,
7884
}
7985
}
8086

87+
pub(crate) async fn recv(&mut self) -> (ClientHandle, ProtoEvent) {
88+
self.recv_rx.recv().await.expect("channel closed")
89+
}
90+
8191
pub(crate) async fn send(
8292
&self,
8393
event: ProtoEvent,
@@ -103,68 +113,105 @@ impl LanMouseConnection {
103113
}
104114

105115
// check if we are already trying to connect
106-
{
107-
let mut connecting = self.connecting.lock().await;
108-
if connecting.contains(&handle) {
109-
return Err(LanMouseConnectionError::NotConnected);
110-
} else {
111-
connecting.insert(handle);
112-
}
116+
let mut connecting = self.connecting.lock().await;
117+
if !connecting.contains(&handle) {
118+
connecting.insert(handle);
119+
// connect in the background
120+
spawn_local(connect_to_handle(
121+
self.server.clone(),
122+
handle,
123+
self.conns.clone(),
124+
self.connecting.clone(),
125+
self.recv_tx.clone(),
126+
));
113127
}
114-
let server = self.server.clone();
115-
let conns = self.conns.clone();
116-
let connecting = self.connecting.clone();
128+
Err(LanMouseConnectionError::NotConnected)
129+
}
130+
}
117131

118-
// connect in the background
119-
spawn_local(async move {
120-
// sending did not work, figure out active conn.
121-
if let Some(addrs) = server.get_ips(handle) {
122-
let port = server.get_port(handle).unwrap_or(DEFAULT_PORT);
123-
let addrs = addrs
124-
.into_iter()
125-
.map(|a| SocketAddr::new(a, port))
126-
.collect::<Vec<_>>();
127-
log::info!("client ({handle}) connecting ... (ips: {addrs:?})");
128-
let res = connect_any(&addrs).await;
129-
let (conn, addr) = match res {
130-
Ok(c) => c,
131-
Err(e) => {
132-
connecting.lock().await.remove(&handle);
133-
return Err(e);
134-
}
135-
};
136-
log::info!("client ({handle}) connected @ {addr}");
137-
server.set_active_addr(handle, Some(addr));
138-
conns.lock().await.insert(addr, conn.clone());
132+
async fn connect_to_handle(
133+
server: Server,
134+
handle: ClientHandle,
135+
conns: Rc<Mutex<HashMap<SocketAddr, Arc<dyn Conn + Send + Sync>>>>,
136+
connecting: Rc<Mutex<HashSet<ClientHandle>>>,
137+
tx: Sender<(ClientHandle, ProtoEvent)>,
138+
) -> Result<(), LanMouseConnectionError> {
139+
// sending did not work, figure out active conn.
140+
if let Some(addrs) = server.get_ips(handle) {
141+
let port = server.get_port(handle).unwrap_or(DEFAULT_PORT);
142+
let addrs = addrs
143+
.into_iter()
144+
.map(|a| SocketAddr::new(a, port))
145+
.collect::<Vec<_>>();
146+
log::info!("client ({handle}) connecting ... (ips: {addrs:?})");
147+
let res = connect_any(&addrs).await;
148+
let (conn, addr) = match res {
149+
Ok(c) => c,
150+
Err(e) => {
139151
connecting.lock().await.remove(&handle);
140-
spawn_local(async move {
141-
loop {
142-
let (buf, len) = ProtoEvent::Ping.into();
143-
if let Err(e) = conn.send(&buf[..len]).await {
144-
log::warn!("client ({handle}) @ {addr} connection closed: {e}");
145-
conns.lock().await.remove(&addr);
146-
server.set_active_addr(handle, None);
147-
let active: Vec<SocketAddr> =
148-
conns.lock().await.keys().copied().collect();
149-
log::info!("active connections: {active:?}");
150-
break;
151-
}
152-
tokio::time::sleep(Duration::from_millis(500)).await;
153-
let mut buf = [0u8; MAX_EVENT_SIZE];
154-
if let Err(e) = conn.recv(&mut buf).await {
155-
log::warn!("recv(): client ({handle}) @ {addr} connection closed: {e}");
156-
conns.lock().await.remove(&addr);
157-
server.set_active_addr(handle, None);
158-
let active: Vec<SocketAddr> =
159-
conns.lock().await.keys().copied().collect();
160-
log::info!("active connections: {active:?}");
161-
break;
162-
}
163-
}
164-
});
152+
return Err(e);
165153
}
166-
Result::<(), LanMouseConnectionError>::Ok(())
167-
});
168-
Err(LanMouseConnectionError::NotConnected)
154+
};
155+
log::info!("client ({handle}) connected @ {addr}");
156+
server.set_active_addr(handle, Some(addr));
157+
conns.lock().await.insert(addr, conn.clone());
158+
connecting.lock().await.remove(&handle);
159+
160+
// poll connection for active
161+
spawn_local(ping_pong(
162+
server.clone(),
163+
handle,
164+
addr,
165+
conn.clone(),
166+
conns.clone(),
167+
));
168+
169+
// receiver
170+
spawn_local(receive_loop(handle, conn, tx));
171+
return Ok(());
172+
}
173+
Err(LanMouseConnectionError::NotConnected)
174+
}
175+
176+
async fn ping_pong(
177+
server: Server,
178+
handle: ClientHandle,
179+
addr: SocketAddr,
180+
conn: Arc<dyn Conn + Send + Sync>,
181+
conns: Rc<Mutex<HashMap<SocketAddr, Arc<dyn Conn + Send + Sync>>>>,
182+
) {
183+
loop {
184+
let (buf, len) = ProtoEvent::Ping.into();
185+
if let Err(e) = conn.send(&buf[..len]).await {
186+
log::warn!("client ({handle}) @ {addr} connection closed: {e}");
187+
conns.lock().await.remove(&addr);
188+
server.set_active_addr(handle, None);
189+
let active: Vec<SocketAddr> = conns.lock().await.keys().copied().collect();
190+
log::info!("active connections: {active:?}");
191+
break;
192+
}
193+
tokio::time::sleep(Duration::from_millis(500)).await;
194+
let mut buf = [0u8; MAX_EVENT_SIZE];
195+
if let Err(e) = conn.recv(&mut buf).await {
196+
log::warn!("recv(): client ({handle}) @ {addr} connection closed: {e}");
197+
conns.lock().await.remove(&addr);
198+
server.set_active_addr(handle, None);
199+
let active: Vec<SocketAddr> = conns.lock().await.keys().copied().collect();
200+
log::info!("active connections: {active:?}");
201+
break;
202+
}
203+
}
204+
}
205+
206+
async fn receive_loop(
207+
handle: ClientHandle,
208+
conn: Arc<dyn Conn + Send + Sync>,
209+
tx: Sender<(ClientHandle, ProtoEvent)>,
210+
) {
211+
let mut buf = [0u8; MAX_EVENT_SIZE];
212+
while let Ok(_) = conn.recv(&mut buf).await {
213+
if let Ok(event) = buf.try_into() {
214+
tx.send((handle, event));
215+
}
169216
}
170217
}

src/server.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub struct ReleaseToken;
4141

4242
#[derive(Clone)]
4343
pub struct Server {
44+
active: Rc<Cell<Option<ClientHandle>>>,
4445
pub(crate) client_manager: Rc<RefCell<ClientManager>>,
4546
port: Rc<Cell<u16>>,
4647
pub(crate) release_bind: Vec<input_event::scancode::Linux>,
@@ -493,6 +494,14 @@ impl Server {
493494
pub(crate) fn release_capture(&self) {
494495
self.should_release.replace(Some(ReleaseToken));
495496
}
497+
498+
pub(crate) fn set_active(&self, handle: Option<ClientHandle>) {
499+
self.active.replace(handle);
500+
}
501+
502+
pub(crate) fn get_active(&self) -> Option<ClientHandle> {
503+
self.active.get()
504+
}
496505
}
497506

498507
fn to_capture_pos(pos: Position) -> input_capture::Position {

0 commit comments

Comments
 (0)