Skip to content

Commit

Permalink
Clean up transport code
Browse files Browse the repository at this point in the history
  • Loading branch information
RemcoSmitsDev committed Jun 29, 2024
1 parent 01d384e commit 854ff68
Showing 1 changed file with 12 additions and 26 deletions.
38 changes: 12 additions & 26 deletions crates/dap/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,18 @@ impl Transport {
let (client_tx, server_rx) = unbounded::<Payload>();
let (server_tx, client_rx) = unbounded::<Payload>();

let transport = Self {
let transport = Arc::new(Self {
pending_requests: Mutex::new(HashMap::default()),
};

let transport = Arc::new(transport);
});

let _ = cx.update(|cx| {
cx.spawn(|_| Self::recv(transport.clone(), server_stdout, client_tx))
cx.spawn(|_| Self::receive(transport.clone(), server_stdout, client_tx))
.detach_and_log_err(cx);
cx.spawn(|_| Self::send(transport, server_stdin, client_rx))
.detach_and_log_err(cx);

if let Some(stderr) = server_stderr {
cx.spawn(|_| Self::err(stderr)).detach();
cx.spawn(|_| Self::err(stderr)).detach_and_log_err(cx);
}
});

Expand Down Expand Up @@ -167,8 +165,6 @@ impl Transport {
server_stdin: &mut Box<dyn AsyncWrite + Unpin + Send>,
request: String,
) -> Result<()> {
dbg!("Request {}", &request);

server_stdin
.write_all(format!("Content-Length: {}\r\n\r\n", request.len()).as_bytes())
.await?;
Expand All @@ -195,15 +191,11 @@ impl Transport {
) -> Result<()> {
match msg {
Payload::Response(res) => {
match self.pending_requests.lock().remove(&res.request_seq) {
Some(mut tx) => match tx.send(Self::process_response(res)).await {
Ok(_) => (),
Err(_) => (),
},
None => {
client_tx.send(Payload::Response(res)).await.log_err();
}
}
if let Some(mut tx) = self.pending_requests.lock().remove(&res.request_seq) {
tx.send(Self::process_response(res)).await?;
} else {
client_tx.send(Payload::Response(res)).await?;
};

Ok(())
}
Expand All @@ -218,7 +210,7 @@ impl Transport {
}
}

async fn recv(
async fn receive(
transport: Arc<Self>,
mut server_stdout: Box<dyn AsyncBufRead + Unpin + Send>,
client_tx: UnboundedSender<Payload>,
Expand Down Expand Up @@ -248,16 +240,10 @@ impl Transport {
Ok(())
}

async fn err(mut server_stderr: Box<dyn AsyncBufRead + Unpin + Send>) {
async fn err(mut server_stderr: Box<dyn AsyncBufRead + Unpin + Send>) -> Result<()> {
let mut recv_buffer = String::new();
loop {
match Self::recv_server_error(&mut server_stderr, &mut recv_buffer).await {
Ok(_) => {}
Err(err) => {
dbg!("err: <- {:?}", err);
break;
}
}
Self::recv_server_error(&mut server_stderr, &mut recv_buffer).await?;
}
}
}

0 comments on commit 854ff68

Please sign in to comment.