diff --git a/Cargo.lock b/Cargo.lock index c8fc1143d..e7858581b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -999,7 +999,7 @@ dependencies = [ "embed-resource", "futures", "hex", - "ironrdp", + "ironrdp 0.5.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", "notify-debouncer-mini", "parking_lot", "rand", @@ -1060,7 +1060,8 @@ dependencies = [ "http-body-util", "hyper 1.5.0", "hyper-util", - "ironrdp-pdu", + "ironrdp-core", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", "ironrdp-rdcleanpath", "jmux-proxy", "multibase", @@ -1231,16 +1232,25 @@ name = "devolutions-session" version = "2024.3.4" dependencies = [ "anyhow", + "async-trait", "camino", "cfg-if", "ctrlc", + "devolutions-gateway-task", "devolutions-log", "embed-resource", + "futures", + "ironrdp 0.5.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", + "now-proto-pdu", "parking_lot", "serde", "serde_json", "tap", + "tempfile", + "thiserror", + "tokio 1.41.0", "tracing", + "win-api-wrappers", "windows 0.58.0", ] @@ -2298,15 +2308,26 @@ dependencies = [ "ironrdp-server", ] +[[package]] +name = "ironrdp" +version = "0.5.0" +source = "git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf#7c268d863048d0a9182b3f7bf778668de8db4ccf" +dependencies = [ + "ironrdp-connector 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", + "ironrdp-core", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", + "ironrdp-session", +] + [[package]] name = "ironrdp-acceptor" version = "0.1.0" source = "git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861#2e1a9ac88e38e7d92d893007bc25d0a05c365861" dependencies = [ "ironrdp-async", - "ironrdp-connector", - "ironrdp-pdu", - "ironrdp-svc", + "ironrdp-connector 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "ironrdp-svc 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", "tracing", ] @@ -2316,8 +2337,8 @@ version = "0.1.0" source = "git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861#2e1a9ac88e38e7d92d893007bc25d0a05c365861" dependencies = [ "bitflags 2.6.0", - "ironrdp-dvc", - "ironrdp-pdu", + "ironrdp-dvc 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", "num-derive", "num-traits", ] @@ -2328,8 +2349,8 @@ version = "0.1.0" source = "git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861#2e1a9ac88e38e7d92d893007bc25d0a05c365861" dependencies = [ "bytes 1.8.0", - "ironrdp-connector", - "ironrdp-pdu", + "ironrdp-connector 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", "tracing", ] @@ -2339,8 +2360,8 @@ version = "0.1.0" source = "git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861#2e1a9ac88e38e7d92d893007bc25d0a05c365861" dependencies = [ "bitflags 2.6.0", - "ironrdp-pdu", - "ironrdp-svc", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "ironrdp-svc 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", "thiserror", "tracing", ] @@ -2350,24 +2371,63 @@ name = "ironrdp-connector" version = "0.1.0" source = "git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861#2e1a9ac88e38e7d92d893007bc25d0a05c365861" dependencies = [ - "ironrdp-error", - "ironrdp-pdu", - "ironrdp-svc", + "ironrdp-error 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "ironrdp-svc 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "rand_core", + "sspi 0.11.1", + "tracing", + "url", + "winapi", +] + +[[package]] +name = "ironrdp-connector" +version = "0.1.0" +source = "git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf#7c268d863048d0a9182b3f7bf778668de8db4ccf" +dependencies = [ + "ironrdp-core", + "ironrdp-error 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", + "ironrdp-svc 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", + "picky", + "picky-asn1-der 0.5.0", + "picky-asn1-x509 0.13.0", "rand_core", - "sspi", + "sspi 0.13.0", "tracing", "url", "winapi", ] +[[package]] +name = "ironrdp-core" +version = "0.1.0" +source = "git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf#7c268d863048d0a9182b3f7bf778668de8db4ccf" +dependencies = [ + "ironrdp-error 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", +] + [[package]] name = "ironrdp-displaycontrol" version = "0.1.0" source = "git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861#2e1a9ac88e38e7d92d893007bc25d0a05c365861" dependencies = [ - "ironrdp-dvc", - "ironrdp-pdu", - "ironrdp-svc", + "ironrdp-dvc 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "ironrdp-svc 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "tracing", +] + +[[package]] +name = "ironrdp-displaycontrol" +version = "0.1.0" +source = "git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf#7c268d863048d0a9182b3f7bf778668de8db4ccf" +dependencies = [ + "ironrdp-core", + "ironrdp-dvc 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", + "ironrdp-svc 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", "tracing", ] @@ -2376,8 +2436,20 @@ name = "ironrdp-dvc" version = "0.1.0" source = "git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861#2e1a9ac88e38e7d92d893007bc25d0a05c365861" dependencies = [ - "ironrdp-pdu", - "ironrdp-svc", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "ironrdp-svc 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "slab", + "tracing", +] + +[[package]] +name = "ironrdp-dvc" +version = "0.1.0" +source = "git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf#7c268d863048d0a9182b3f7bf778668de8db4ccf" +dependencies = [ + "ironrdp-core", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", + "ironrdp-svc 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", "slab", "tracing", ] @@ -2387,6 +2459,11 @@ name = "ironrdp-error" version = "0.1.0" source = "git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861#2e1a9ac88e38e7d92d893007bc25d0a05c365861" +[[package]] +name = "ironrdp-error" +version = "0.1.0" +source = "git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf#7c268d863048d0a9182b3f7bf778668de8db4ccf" + [[package]] name = "ironrdp-graphics" version = "0.1.0" @@ -2396,8 +2473,26 @@ dependencies = [ "bitflags 2.6.0", "bitvec", "byteorder", - "ironrdp-error", - "ironrdp-pdu", + "ironrdp-error 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "lazy_static", + "num-derive", + "num-traits", + "thiserror", +] + +[[package]] +name = "ironrdp-graphics" +version = "0.1.0" +source = "git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf#7c268d863048d0a9182b3f7bf778668de8db4ccf" +dependencies = [ + "bit_field", + "bitflags 2.6.0", + "bitvec", + "byteorder", + "ironrdp-core", + "ironrdp-error 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", "lazy_static", "num-derive", "num-traits", @@ -2413,7 +2508,30 @@ dependencies = [ "bitflags 2.6.0", "byteorder", "der-parser", - "ironrdp-error", + "ironrdp-error 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "md-5", + "num-bigint", + "num-derive", + "num-integer", + "num-traits", + "pkcs1", + "sha1", + "tap", + "thiserror", + "x509-cert", +] + +[[package]] +name = "ironrdp-pdu" +version = "0.1.0" +source = "git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf#7c268d863048d0a9182b3f7bf778668de8db4ccf" +dependencies = [ + "bit_field", + "bitflags 2.6.0", + "byteorder", + "der-parser", + "ironrdp-core", + "ironrdp-error 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", "md-5", "num-bigint", "num-derive", @@ -2429,7 +2547,7 @@ dependencies = [ [[package]] name = "ironrdp-rdcleanpath" version = "0.1.0" -source = "git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861#2e1a9ac88e38e7d92d893007bc25d0a05c365861" +source = "git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf#7c268d863048d0a9182b3f7bf778668de8db4ccf" dependencies = [ "der", ] @@ -2440,8 +2558,8 @@ version = "0.1.0" source = "git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861#2e1a9ac88e38e7d92d893007bc25d0a05c365861" dependencies = [ "bitflags 2.6.0", - "ironrdp-pdu", - "ironrdp-svc", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "ironrdp-svc 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", "tracing", ] @@ -2455,25 +2573,51 @@ dependencies = [ "ironrdp-acceptor", "ironrdp-ainput", "ironrdp-cliprdr", - "ironrdp-displaycontrol", - "ironrdp-dvc", - "ironrdp-graphics", - "ironrdp-pdu", + "ironrdp-displaycontrol 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "ironrdp-dvc 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "ironrdp-graphics 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", "ironrdp-rdpsnd", - "ironrdp-svc", + "ironrdp-svc 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", "ironrdp-tokio", "tokio 1.41.0", "tokio-rustls", "tracing", ] +[[package]] +name = "ironrdp-session" +version = "0.1.0" +source = "git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf#7c268d863048d0a9182b3f7bf778668de8db4ccf" +dependencies = [ + "ironrdp-connector 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", + "ironrdp-core", + "ironrdp-displaycontrol 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", + "ironrdp-dvc 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", + "ironrdp-error 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", + "ironrdp-graphics 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", + "ironrdp-svc 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", + "tracing", +] + [[package]] name = "ironrdp-svc" version = "0.1.0" source = "git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861#2e1a9ac88e38e7d92d893007bc25d0a05c365861" dependencies = [ "bitflags 2.6.0", - "ironrdp-pdu", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", +] + +[[package]] +name = "ironrdp-svc" +version = "0.1.0" +source = "git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf#7c268d863048d0a9182b3f7bf778668de8db4ccf" +dependencies = [ + "bitflags 2.6.0", + "ironrdp-core", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", ] [[package]] @@ -3147,6 +3291,17 @@ dependencies = [ "notify", ] +[[package]] +name = "now-proto-pdu" +version = "0.1.0" +source = "git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf#7c268d863048d0a9182b3f7bf778668de8db4ccf" +dependencies = [ + "bitflags 2.6.0", + "ironrdp-core", + "ironrdp-error 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", + "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", +] + [[package]] name = "ntapi" version = "0.4.1" @@ -3461,6 +3616,7 @@ dependencies = [ "ed25519-dalek", "hex", "hmac", + "http 1.1.0", "md-5", "num-bigint-dig", "p256", @@ -3505,6 +3661,7 @@ dependencies = [ "oid", "serde", "serde_bytes", + "time", "zeroize", ] @@ -3556,6 +3713,7 @@ dependencies = [ "picky-asn1 0.9.0", "picky-asn1-der 0.5.0", "serde", + "widestring 1.1.0", "zeroize", ] @@ -4260,6 +4418,7 @@ dependencies = [ "pkcs1", "pkcs8", "rand_core", + "sha1", "signature", "spki", "subtle", @@ -4900,6 +5059,50 @@ dependencies = [ "zeroize", ] +[[package]] +name = "sspi" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1734839082c6d33f8368b40a40d0d5cc1ed10fbde00dc8a404f64e70272ed3f6" +dependencies = [ + "async-dnssd", + "async-recursion", + "bitflags 2.6.0", + "byteorder", + "cfg-if", + "crypto-mac", + "futures", + "hmac", + "lazy_static", + "md-5", + "md4", + "num-bigint-dig", + "num-derive", + "num-traits", + "oid", + "picky", + "picky-asn1 0.9.0", + "picky-asn1-der 0.5.0", + "picky-asn1-x509 0.13.0", + "picky-krb 0.9.0", + "rand", + "rsa", + "rustls 0.23.15", + "serde", + "serde_derive", + "sha1", + "sha2", + "time", + "tokio 1.41.0", + "tracing", + "url", + "uuid", + "windows 0.58.0", + "windows-sys 0.52.0", + "winreg 0.52.0", + "zeroize", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -5960,6 +6163,7 @@ dependencies = [ "base16ct", "thiserror", "tracing", + "uuid", "windows 0.58.0", ] diff --git a/crates/devolutions-pedm-shared/devolutions-pedm-client-http/src/apis/default_api.rs b/crates/devolutions-pedm-shared/devolutions-pedm-client-http/src/apis/default_api.rs index 0c96f0c4d..84ab00e2e 100644 --- a/crates/devolutions-pedm-shared/devolutions-pedm-client-http/src/apis/default_api.rs +++ b/crates/devolutions-pedm-shared/devolutions-pedm-client-http/src/apis/default_api.rs @@ -17,8 +17,7 @@ use std::rc::Rc; use futures::Future; use hyper; -use super::request as __internal_request; -use super::{configuration, Error}; +use super::{configuration, request as __internal_request, Error}; use crate::models; pub struct DefaultApiClient diff --git a/crates/devolutions-pedm-shared/devolutions-pedm-client-http/src/apis/mod.rs b/crates/devolutions-pedm-shared/devolutions-pedm-client-http/src/apis/mod.rs index e3cddab41..e907d7d10 100644 --- a/crates/devolutions-pedm-shared/devolutions-pedm-client-http/src/apis/mod.rs +++ b/crates/devolutions-pedm-shared/devolutions-pedm-client-http/src/apis/mod.rs @@ -1,6 +1,4 @@ -use http; -use hyper; -use serde_json; +use {http, hyper, serde_json}; #[derive(Debug)] pub enum Error { diff --git a/crates/devolutions-pedm-shared/devolutions-pedm-client-http/src/apis/request.rs b/crates/devolutions-pedm-shared/devolutions-pedm-client-http/src/apis/request.rs index b511abd40..44d52c91b 100644 --- a/crates/devolutions-pedm-shared/devolutions-pedm-client-http/src/apis/request.rs +++ b/crates/devolutions-pedm-shared/devolutions-pedm-client-http/src/apis/request.rs @@ -1,13 +1,10 @@ use std::collections::HashMap; use std::pin::Pin; -use futures; use futures::future::*; use futures::Future; -use hyper; use hyper::header::{HeaderValue, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, USER_AGENT}; -use serde; -use serde_json; +use {futures, hyper, serde, serde_json}; use super::{configuration, Error}; diff --git a/crates/win-api-wrappers/Cargo.toml b/crates/win-api-wrappers/Cargo.toml index 98afc9a62..b05c45529 100644 --- a/crates/win-api-wrappers/Cargo.toml +++ b/crates/win-api-wrappers/Cargo.toml @@ -12,6 +12,7 @@ base16ct = { version = "0.2", features = ["alloc"] } anyhow = "1.0" thiserror = "1.0" tracing = "0.1" +uuid = { version = "1.10", features = ["v4"] } [dependencies.windows] version = "0.58.0" diff --git a/crates/win-api-wrappers/src/event.rs b/crates/win-api-wrappers/src/event.rs new file mode 100644 index 000000000..83cff9b61 --- /dev/null +++ b/crates/win-api-wrappers/src/event.rs @@ -0,0 +1,38 @@ +use std::sync::Arc; + +use windows::Win32::Foundation::HANDLE; +use windows::Win32::System::Threading::{CreateEventW, SetEvent}; + +use crate::handle::Handle; + +/// RAII wrapper for WinAPI event handle. +#[derive(Debug, Clone)] +pub struct Event { + handle: Arc, +} + +impl Event { + pub fn new_unnamed() -> anyhow::Result { + // SAFETY: No preconditions. + let raw_handle = unsafe { CreateEventW(None, false, false, None) }?; + + // SAFETY: `CreateEventW` always returns a valid handle on success. + let handle = unsafe { Handle::new_owned(raw_handle) }?; + + Ok(Self { + handle: Arc::new(handle), + }) + } + + pub fn raw(&self) -> HANDLE { + self.handle.raw() + } + + pub fn set(&self) -> anyhow::Result<()> { + // SAFETY: No preconditions. + unsafe { + SetEvent(self.handle.raw())?; + } + Ok(()) + } +} diff --git a/crates/win-api-wrappers/src/lib.rs b/crates/win-api-wrappers/src/lib.rs index a0e44bbe5..cee96c73b 100644 --- a/crates/win-api-wrappers/src/lib.rs +++ b/crates/win-api-wrappers/src/lib.rs @@ -7,6 +7,7 @@ mod lib_win { mod error; pub use error::Error; + pub mod event; pub mod handle; pub mod identity; pub mod process; @@ -15,6 +16,7 @@ mod lib_win { pub mod thread; pub mod token; pub mod utils; + pub mod wts; // Allowed since the goal is to replicate the Windows API crate so that it's familiar, which itself uses the raw names from the API. #[allow( diff --git a/crates/win-api-wrappers/src/process.rs b/crates/win-api-wrappers/src/process.rs index 483f2e14a..3351fb616 100644 --- a/crates/win-api-wrappers/src/process.rs +++ b/crates/win-api-wrappers/src/process.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::ffi::{c_void, OsString}; use std::fmt::Debug; -use std::mem::{self}; use std::os::windows::ffi::OsStringExt; use std::path::{Path, PathBuf}; use std::{ptr, slice}; @@ -30,8 +29,8 @@ use windows::Win32::System::LibraryLoader::{ GetModuleFileNameW, GetModuleHandleExW, GetProcAddress, GET_MODULE_HANDLE_EX_FLAG_FROM_ADDRESS, }; use windows::Win32::System::Threading::{ - CreateProcessAsUserW, CreateRemoteThread, GetCurrentProcess, GetExitCodeProcess, OpenProcess, OpenProcessToken, - QueryFullProcessImageNameW, TerminateProcess, WaitForSingleObject, CREATE_UNICODE_ENVIRONMENT, + CreateProcessAsUserW, CreateRemoteThread, GetCurrentProcess, GetExitCodeProcess, GetProcessId, OpenProcess, + OpenProcessToken, QueryFullProcessImageNameW, TerminateProcess, WaitForSingleObject, CREATE_UNICODE_ENVIRONMENT, EXTENDED_STARTUPINFO_PRESENT, INFINITE, LPPROC_THREAD_ATTRIBUTE_LIST, LPTHREAD_START_ROUTINE, PEB, PROCESS_ACCESS_RIGHTS, PROCESS_BASIC_INFORMATION, PROCESS_CREATION_FLAGS, PROCESS_INFORMATION, PROCESS_NAME_WIN32, PROCESS_TERMINATE, STARTUPINFOEXW, STARTUPINFOW, STARTUPINFOW_FLAGS, @@ -122,7 +121,7 @@ impl Process { let thread = self.create_thread( // SAFETY: `LoadLibraryW` fits the type. It takes one argument that is the name of the library. Some(unsafe { - mem::transmute::<*const c_void, unsafe extern "system" fn(*mut c_void) -> u32>(load_library) + core::mem::transmute::<*const c_void, unsafe extern "system" fn(*mut c_void) -> u32>(load_library) }), Some(allocation.address), )?; @@ -152,7 +151,7 @@ impl Process { )? }; - // SAFETY: The handle is owned by us, we opened the ressource above. + // SAFETY: The handle is owned by us, we opened the resource above. let handle = unsafe { Handle::new_owned(handle) }?; Ok(Thread::from(handle)) @@ -242,6 +241,11 @@ impl Process { }) } + pub fn pid(&self) -> u32 { + // SAFETY: Safe to call with a valid process handle. + unsafe { GetProcessId(self.handle.raw()) } + } + /// Reads process memory at a specified address into a buffer. /// The buffer is not read. /// Returns the number of bytes read. @@ -266,13 +270,13 @@ impl Process { Ok(bytes_read) } - /// Reads a stucture from process memory at a specified address. + /// Reads a structure from process memory at a specified address. /// /// # Safety /// /// - `address` must point to a valid and correctly sized instance of the structure. pub unsafe fn read_struct(&self, address: *const c_void) -> Result { - let mut buf = vec![0; mem::size_of::()]; + let mut buf = vec![0; size_of::()]; // SAFETY: Based on the security requirements of the function, the `address` should // point to a valid and correctly sized instance of `T`. @@ -286,7 +290,7 @@ impl Process { } } - /// Reads a continous array of a structure from process memory at a specified address. + /// Reads a continuous array of a structure from process memory at a specified address. /// /// # Safety /// @@ -301,10 +305,10 @@ impl Process { // However, we assume that the data will be alined as `Vec` wants. let data = unsafe { data.align_to_mut::().1 }; - // SAFETY: `read_memory` does not read `data`, so we can safely pass an unitialized buffer. + // SAFETY: `read_memory` does not read `data`, so we can safely pass an uninitialized buffer. let read_bytes = unsafe { self.read_memory(address.cast(), data) }?; - if count * mem::size_of::() == read_bytes { + if count * size_of::() == read_bytes { // SAFETY: Buffer can hold `count` items and was filled up to that point. unsafe { buf.set_len(count) }; @@ -380,7 +384,7 @@ impl Peb<'_> { let image_path_name = unsafe { self.process.read_array( raw_params.ImagePathName.Buffer.as_ptr(), - raw_params.ImagePathName.Length as usize / mem::size_of::(), + raw_params.ImagePathName.Length as usize / size_of::(), )? }; @@ -388,7 +392,7 @@ impl Peb<'_> { let command_line = unsafe { self.process.read_array( raw_params.CommandLine.Buffer.as_ptr(), - raw_params.CommandLine.Length as usize / mem::size_of::(), + raw_params.CommandLine.Length as usize / size_of::(), )? }; @@ -396,7 +400,7 @@ impl Peb<'_> { let desktop = unsafe { self.process.read_array( raw_params.DesktopInfo.Buffer.as_ptr(), - raw_params.DesktopInfo.Length as usize / mem::size_of::(), + raw_params.DesktopInfo.Length as usize / size_of::(), )? }; @@ -404,7 +408,7 @@ impl Peb<'_> { let working_directory = unsafe { self.process.read_array( raw_params.CurrentDirectory.DosPath.Buffer.as_ptr(), - raw_params.CurrentDirectory.DosPath.Length as usize / mem::size_of::(), + raw_params.CurrentDirectory.DosPath.Length as usize / size_of::(), )? }; @@ -579,8 +583,8 @@ impl ProcessEntry32Iterator { }; // SAFETY: It is safe to zero out the structure as it is a simple POD type. - let mut process_entry: PROCESSENTRY32W = unsafe { mem::zeroed() }; - process_entry.dwSize = mem::size_of::() + let mut process_entry: PROCESSENTRY32W = unsafe { core::mem::zeroed() }; + process_entry.dwSize = size_of::() .try_into() .expect("BUG: PROCESSENTRY32W size always fits in u32"); @@ -700,7 +704,7 @@ pub fn create_process_as_user( // SAFETY: As per `CreateEnvironmentBlock` documentation: We must specify // `CREATE_UNICODE_ENVIRONMENT` and call `DestroyEnvironmentBlock` after // `CreateProcessAsUser` call. - // - `CREATE_UNICODE_ENVIRONMENT` is always set unconditionaly. + // - `CREATE_UNICODE_ENVIRONMENT` is always set unconditionally. // - `DestroyEnvironmentBlock` is called in the `ProcessEnvironment` destructor. // // Therefore, all preconditions are met to safely call `CreateEnvironmentBlock`. @@ -742,10 +746,10 @@ pub fn create_process_as_user( ) }?; - // SAFETY: The handle is owned by us, we opened the ressource above. + // SAFETY: The handle is owned by us, we opened the resource above. let process = unsafe { Handle::new_owned(raw_process_information.hProcess).map(Process::from)? }; - // SAFETY: The handle is owned by us, we opened the ressource above. + // SAFETY: The handle is owned by us, we opened the resource above. let thread = unsafe { Handle::new_owned(raw_process_information.hThread).map(Thread::from)? }; Ok(ProcessInformation { diff --git a/crates/win-api-wrappers/src/rpc.rs b/crates/win-api-wrappers/src/rpc.rs index 36d4e55be..08e07b10f 100644 --- a/crates/win-api-wrappers/src/rpc.rs +++ b/crates/win-api-wrappers/src/rpc.rs @@ -1,6 +1,6 @@ use std::ffi::c_void; use std::ptr::{self, NonNull}; -use std::{mem, slice}; +use std::slice; use windows::core::GUID; use windows::Win32::Foundation::{ERROR_MORE_DATA, E_INVALIDARG}; @@ -51,9 +51,9 @@ impl RpcBindingHandle { server_principal_name.resize((attribs.ServerPrincipalNameBufferLength / 2) as usize, 0); attribs.ClientPrincipalName = client_principal_name.as_mut_ptr(); - attribs.ClientPrincipalNameBufferLength = (client_principal_name.len() * mem::size_of::()).try_into()?; + attribs.ClientPrincipalNameBufferLength = (client_principal_name.len() * size_of::()).try_into()?; attribs.ServerPrincipalName = server_principal_name.as_mut_ptr(); - attribs.ServerPrincipalNameBufferLength = (server_principal_name.len() * mem::size_of::()).try_into()?; + attribs.ServerPrincipalNameBufferLength = (server_principal_name.len() * size_of::()).try_into()?; // SAFETY: No preconditions. let status = unsafe { RpcServerInqCallAttributesW(Some(self.0), &mut attribs as *mut _ as *mut c_void) }; @@ -161,14 +161,14 @@ impl RpcServerInterfacePointer { let addr = unsafe { raw_dispatch_table.add(i) }.cast_mut(); // SAFETY: Assume the address points to a valid handler which is not currently in use. - let old_prot = unsafe { set_memory_protection(addr.cast(), mem::size_of::<*const ()>(), PAGE_READWRITE) }?; + let old_prot = unsafe { set_memory_protection(addr.cast(), size_of::<*const ()>(), PAGE_READWRITE) }?; // TODO: See if it could be possible to freeze other threads during switch or to do an atomic switch. // SAFETY: Because of previous assumption and memory protection, this should succeed. unsafe { *addr = *new_handler }; // SAFETY: Address is already assumed to be valid. - let _ = unsafe { set_memory_protection(addr.cast(), mem::size_of::<*const ()>(), old_prot) }?; + let _ = unsafe { set_memory_protection(addr.cast(), size_of::<*const ()>(), old_prot) }?; } Ok(()) diff --git a/crates/win-api-wrappers/src/security/acl.rs b/crates/win-api-wrappers/src/security/acl.rs index c021909a9..d85fbf6cb 100644 --- a/crates/win-api-wrappers/src/security/acl.rs +++ b/crates/win-api-wrappers/src/security/acl.rs @@ -1,5 +1,4 @@ use std::alloc::Layout; -use std::mem::{self}; use std::{ptr, slice}; use anyhow::{bail, Result}; @@ -83,7 +82,7 @@ impl Ace { }; // SAFETY: We are adding to the pointer in byte aligned mode to access next field. - ptr = unsafe { ptr.byte_add(mem::size_of::()) }; + ptr = unsafe { ptr.byte_add(size_of::()) }; #[allow(clippy::cast_ptr_alignment)] // FIXME(DGW-221): Raw* hack is flawed. // SAFETY: Buffer is at least `size_of:: + size_of::` big. @@ -92,7 +91,7 @@ impl Ace { }; // SAFETY: We are adding to the pointer in byte aligned mode to access next field. - ptr = unsafe { ptr.byte_add(mem::size_of::()) }; + ptr = unsafe { ptr.byte_add(size_of::()) }; // SAFETY: Buffer is at least `size_of:: + size_of:: + body.len()` big. unsafe { ptr.copy_from(body.as_ptr(), body.len()) }; @@ -109,21 +108,21 @@ impl Ace { // SAFETY: Assume that the pointer points to a valid ACE_HEADER if not null. let header = unsafe { ptr.as_ref() }.ok_or_else(|| Error::NullPointer("ACE header"))?; - if (header.AceSize as usize) < mem::size_of::() + mem::size_of::() { + if (header.AceSize as usize) < size_of::() + size_of::() { bail!(Error::from_win32(ERROR_INVALID_DATA)); } // SAFETY: Assume that the header is followed by a 4 byte access mask. - ptr = unsafe { ptr.byte_add(mem::size_of::()) }; + ptr = unsafe { ptr.byte_add(size_of::()) }; // SAFETY: Assume that the header is followed by a 4 byte access mask. #[allow(clippy::cast_ptr_alignment)] // FIXME(DGW-221): Raw* hack is flawed. let access_mask = unsafe { ptr.cast::().read() }; // SAFETY: Assume buffer is big enough to fit Ace data. - ptr = unsafe { ptr.byte_add(mem::size_of::()) }; + ptr = unsafe { ptr.byte_add(size_of::()) }; - let body_size = header.AceSize as usize - mem::size_of::() - mem::size_of::(); + let body_size = header.AceSize as usize - size_of::() - size_of::(); // SAFETY: `body_size` must be >= 0 because of previous check. Pointer is valid. let body = unsafe { slice::from_raw_parts(ptr.cast::(), body_size) }; @@ -158,10 +157,10 @@ impl Acl { pub fn to_raw(&self) -> Result> { let raw_aces = self.aces.iter().map(Ace::to_raw).collect::>>()?; - let size = mem::size_of::() + raw_aces.iter().map(Vec::len).sum::(); + let size = size_of::() + raw_aces.iter().map(Vec::len).sum::(); // Align on u32 boundary - let size = (size + mem::size_of::() - 1) & !3; + let size = (size + size_of::() - 1) & !3; let mut buf = vec![0; size]; diff --git a/crates/win-api-wrappers/src/thread.rs b/crates/win-api-wrappers/src/thread.rs index cae17f537..430af9b3b 100644 --- a/crates/win-api-wrappers/src/thread.rs +++ b/crates/win-api-wrappers/src/thread.rs @@ -1,6 +1,5 @@ use std::ffi::c_void; use std::fmt::Debug; -use std::mem::{self}; use anyhow::{bail, Result}; @@ -32,7 +31,7 @@ impl Thread { pub fn get_by_id(id: u32, desired_access: THREAD_ACCESS_RIGHTS) -> Result { // SAFETY: No preconditions. let handle = unsafe { OpenThread(desired_access, false, id)? }; - // SAFETY: The handle is owned by us, we opened the ressource above. + // SAFETY: The handle is owned by us, we opened the resource above. let handle = unsafe { Handle::new_owned(handle)? }; Ok(Self::from(handle)) @@ -172,9 +171,9 @@ impl ThreadAttributeType<'_> { pub fn size(&self) -> usize { match self { - ThreadAttributeType::ParentProcess(_) => mem::size_of::(), - ThreadAttributeType::ExtendedFlags(_) => mem::size_of::(), - ThreadAttributeType::HandleList(h) => mem::size_of::() * h.len(), + ThreadAttributeType::ParentProcess(_) => size_of::(), + ThreadAttributeType::ExtendedFlags(_) => size_of::(), + ThreadAttributeType::HandleList(h) => size_of::() * h.len(), } } } diff --git a/crates/win-api-wrappers/src/utils.rs b/crates/win-api-wrappers/src/utils.rs index 2f190a930..bf36bd4f4 100644 --- a/crates/win-api-wrappers/src/utils.rs +++ b/crates/win-api-wrappers/src/utils.rs @@ -2,26 +2,26 @@ use std::collections::HashMap; use std::ffi::{c_void, OsStr, OsString}; use std::fmt::Debug; use std::io::{Read, Write}; -use std::mem::{self, MaybeUninit}; +use std::mem::MaybeUninit; use std::os::windows::ffi::{OsStrExt, OsStringExt}; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::{ptr, slice}; use anyhow::{bail, Result}; - -use crate::handle::{Handle, HandleWrapper}; -use crate::process::Process; -use crate::security::acl::{RawSecurityAttributes, SecurityAttributes}; -use crate::thread::Thread; -use crate::token::Token; -use crate::Error; +use uuid::Uuid; use windows::core::{Interface, PCSTR, PCWSTR, PSTR, PWSTR}; -use windows::Win32::Foundation::{LocalFree, E_INVALIDARG, E_POINTER, HANDLE, HLOCAL, MAX_PATH, UNICODE_STRING}; +use windows::Win32::Foundation::{ + GetLastError, LocalFree, SetHandleInformation, E_INVALIDARG, E_POINTER, GENERIC_WRITE, HANDLE, HANDLE_FLAGS, + HANDLE_FLAG_INHERIT, HLOCAL, MAX_PATH, UNICODE_STRING, +}; use windows::Win32::Security::{ RevertToSelf, SecurityIdentification, TokenPrimary, TOKEN_ACCESS_MASK, TOKEN_ALL_ACCESS, }; -use windows::Win32::Storage::FileSystem::{CreateDirectoryW, FlushFileBuffers, ReadFile, WriteFile}; +use windows::Win32::Storage::FileSystem::{ + CreateDirectoryW, CreateFileW, FlushFileBuffers, ReadFile, WriteFile, FILE_FLAGS_AND_ATTRIBUTES, + FILE_FLAG_OVERLAPPED, FILE_SHARE_NONE, OPEN_EXISTING, PIPE_ACCESS_INBOUND, +}; use windows::Win32::System::Com::{ CoCreateInstance, CoInitializeEx, CoUninitialize, IPersistFile, CLSCTX_INPROC_SERVER, COINIT, COINIT_MULTITHREADED, STGM_READ, @@ -35,11 +35,19 @@ use windows::Win32::System::Memory::{ PAGE_READWRITE, }; use windows::Win32::System::Pipes::{ - CreatePipe, GetNamedPipeClientProcessId, ImpersonateNamedPipeClient, PeekNamedPipe, + CreateNamedPipeW, CreatePipe, GetNamedPipeClientProcessId, ImpersonateNamedPipeClient, PeekNamedPipe, + PIPE_READMODE_BYTE, PIPE_TYPE_BYTE, PIPE_WAIT, }; use windows::Win32::UI::Controls::INFOTIPSIZE; use windows::Win32::UI::Shell::{CommandLineToArgvW, IShellLinkW, ShellLink, SLGP_SHORTPATH, SLR_NO_UI}; +use crate::handle::{Handle, HandleWrapper}; +use crate::process::Process; +use crate::security::acl::{RawSecurityAttributes, SecurityAttributes}; +use crate::thread::Thread; +use crate::token::Token; +use crate::Error; + pub trait SafeWindowsString { fn to_string_safe(&self) -> Result; fn to_os_string_safe(&self) -> Result; @@ -150,13 +158,13 @@ impl WideString { .0 .as_ref() .and_then(|x| x.split_last()) - .map(|x| mem::size_of_val(x.1)) + .map(|x| size_of_val(x.1)) .unwrap_or(0) .try_into()?, MaximumLength: self .0 .as_ref() - .map(|x| mem::size_of_val(x.as_slice())) + .map(|x| size_of_val(x.as_slice())) .unwrap_or(0) .try_into()?, Buffer: PWSTR(self.as_pcwstr().0.cast_mut()), @@ -403,7 +411,7 @@ pub fn environment_block(token: Option<&Token>, inherit: bool) -> Result) -> String { let mut expanded = String::with_capacity(src.len()); - // For strings such as "%MyVar%MyVar%", only the first occurence should be replaced. + // For strings such as "%MyVar%MyVar%", only the first occurrence should be replaced. let mut last_replaced = false; let mut it = src.split('%').peekable(); @@ -628,15 +636,105 @@ impl Pipe { ) }?; - // SAFETY: We created the ressource above and are thus owning it. + // SAFETY: We created the resource above and are thus owning it. let rx = unsafe { Handle::new_owned(rx)? }; - // SAFETY: We created the ressource above and are thus owning it. + // SAFETY: We created the resource above and are thus owning it. let tx = unsafe { Handle::new_owned(tx)? }; Ok((Self { handle: rx }, Self { handle: tx })) } + /// Creates anonymous synchronous pipe for stdin. + pub fn new_sync_stdin_redirection_pipe() -> Result<(Self, Self)> { + let security_attributes = SecurityAttributes { + security_descriptor: None, + inherit_handle: true, + }; + + let (read, write) = Self::new_anonymous(Some(&security_attributes), 0)?; + + // SAFETY: Handle is ensured to be valid by the code above. + unsafe { + // Ensure the write handle to the pipe for STDIN is not inherited. + SetHandleInformation(write.handle.raw(), HANDLE_FLAG_INHERIT.0, HANDLE_FLAGS(0))?; + } + + Ok((read, write)) + } + + /// Create a new async(overlapped io) pipe for stdout/stderr redirection. + /// + /// NOTE: This method creates a **named** pipe with a random generated name. Named pipe is + /// required for async io, as anonymous pipes do not support async io. + pub fn new_async_stdout_redirection_pipe() -> Result<(Self, Self)> { + const PIPE_INSTANCES: u32 = 1; + const PIPE_BUFFER_SIZE_HINT: u32 = 4 * 1024; + const PIPE_PREFIX: &str = r"\\.\pipe\devolutions"; + + // Example pipe name: `\\.\pipe\devolutions-75993146-80c5-4c93-a2ea-1d5d5cd5de4a`. + let pipe_id = Uuid::new_v4().to_string(); + let pipe_name_str = format!("{PIPE_PREFIX}-{pipe_id}"); + let pipe_name = WideString::from(&pipe_name_str); + + // SAFETY: No preconditions. We are creating a named pipe with a random name. + let read_endpoint = unsafe { + CreateNamedPipeW( + pipe_name.as_pcwstr(), + PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, + PIPE_INSTANCES, + PIPE_BUFFER_SIZE_HINT, + 0, + 0, + None, + ) + }; + + // SAFETY: We created the resource above and are thus owning it. + let handle = unsafe { Handle::new_owned(read_endpoint) }?; + + // For some reason, `windows` crate do not return `Result` here, we should check for invalid + // handle manually. + let read = if !read_endpoint.is_invalid() { + // Take ownership + Pipe { handle } + } else { + // SAFETY: No preconditions. + let error = unsafe { GetLastError() }; + return Err(anyhow::anyhow!( + "Failed to create named pipe `{pipe_name_str}`; Error: {error:?}" + )); + }; + + let security_attributes = RawSecurityAttributes::try_from(&SecurityAttributes { + security_descriptor: None, + inherit_handle: true, + })?; + + // SAFETY: Pipe is created above and is valid. + let write_endpoint = unsafe { + CreateFileW( + pipe_name.as_pcwstr(), + GENERIC_WRITE.0, + FILE_SHARE_NONE, + Some(security_attributes.as_raw() as *const _), + OPEN_EXISTING, + // Note that we are not setting FILE_FLAG_OVERLAPPED here, as we are not expecting async + // writes from target process stdout/stderr. + FILE_FLAGS_AND_ATTRIBUTES(0), + HANDLE::default(), + ) + }?; + + // SAFETY: We created the resource above and are thus owning it. + let handle = unsafe { Handle::new_owned(write_endpoint) }?; + + let write = Pipe { handle }; + + Ok((read, write)) + } + /// Peeks the contents of the pipe in `data`, while returning the amount of bytes available on the pipe. pub fn peek(&self, data: Option<&mut [u8]>) -> Result { let mut available = 0; @@ -789,5 +887,5 @@ pub fn nul_slice_wide_str(slice: &[u16]) -> &[u16] { /// Typically fine since we rarely work with structs whose size in memory is bigger than u32::MAX. #[expect(clippy::cast_possible_truncation)] pub(crate) const fn u32size_of() -> u32 { - mem::size_of::() as u32 + size_of::() as u32 } diff --git a/crates/win-api-wrappers/src/wts.rs b/crates/win-api-wrappers/src/wts.rs new file mode 100644 index 000000000..265745333 --- /dev/null +++ b/crates/win-api-wrappers/src/wts.rs @@ -0,0 +1,126 @@ +use windows::core::Owned; +use windows::Win32::Foundation::{DuplicateHandle, DUPLICATE_SAME_ACCESS, HANDLE}; +use windows::Win32::System::RemoteDesktop::{ + WTSFreeMemory, WTSVirtualChannelClose, WTSVirtualChannelOpenEx, WTSVirtualChannelQuery, WTSVirtualFileHandle, + WTS_CHANNEL_OPTION_DYNAMIC, WTS_CURRENT_SESSION, +}; +use windows::Win32::System::Threading::GetCurrentProcess; + +use crate::utils::AnsiString; + +/// RAII wrapper for WTS virtual channel handle. +pub struct WTSVirtualChannel(HANDLE); + +impl WTSVirtualChannel { + /// # Safety + /// `handle` must be a valid handle returned from `WTSVirtualChannelOpenEx`. + pub unsafe fn new(handle: HANDLE) -> Self { + Self(handle) + } + + pub fn open_dvc(name: &str) -> anyhow::Result { + let channel_name = AnsiString::from(name); + + // SAFETY: Channel name is always a valid pointer to a null-terminated string. + let raw_wts_handle = unsafe { + WTSVirtualChannelOpenEx(WTS_CURRENT_SESSION, channel_name.as_pcstr(), WTS_CHANNEL_OPTION_DYNAMIC) + }?; + + // SAFETY: `WTSVirtualChannelOpenEx` always returns a valid handle on success. + Ok(unsafe { Self::new(raw_wts_handle) }) + } + + pub fn query_file_handle(&self) -> anyhow::Result> { + let mut channel_file_handle_ptr: *mut core::ffi::c_void = std::ptr::null_mut(); + let mut len: u32 = 0; + + // SAFETY: It is safe to call `WTSVirtualChannelQuery` with valid channel and + // destination pointers. + unsafe { + WTSVirtualChannelQuery( + self.0, + WTSVirtualFileHandle, + &mut channel_file_handle_ptr as *mut _, + &mut len, + ) + }?; + + // SAFETY: `channel_file_handle_ptr` is always a valid pointer to a handle on success. + let channel_file_handle_ptr = unsafe { WTSMemory::new(channel_file_handle_ptr) }; + + if len != u32::try_from(size_of::()).expect("HANDLE always fits into u32") { + return Err(anyhow::anyhow!("Failed to query DVC channel file handle")); + } + + let mut raw_handle = HANDLE::default(); + + // SAFETY: `GetCurrentProcess` is always safe to call. + let current_process = unsafe { GetCurrentProcess() }; + + // SAFETY: `lptargetprocesshandle` is valid and points to `raw_handle` declared above, + // therefore it is safe to call. + unsafe { + DuplicateHandle( + current_process, + channel_file_handle_ptr.as_handle(), + current_process, + &mut raw_handle, + 0, + false, + DUPLICATE_SAME_ACCESS, + )?; + }; + + // SAFETY: Handle returned from `DuplicateHandle` is always valid if the function succeeds. + let owned_handle = unsafe { Owned::new(raw_handle) }; + + Ok(owned_handle) + } +} + +impl Drop for WTSVirtualChannel { + fn drop(&mut self) { + // SAFETY: `Ok` value returned from `WTSVirtualChannelOpenEx` is always a valid handle. + if let Err(error) = unsafe { WTSVirtualChannelClose(self.0) } { + error!(%error, "Failed to close WTS virtual channel handle"); + } + } +} + +/// RAII wrapper for WTS memory handle. +struct WTSMemory(*mut core::ffi::c_void); + +impl WTSMemory { + /// # Safety + /// `ptr` must be a valid pointer to a handle returned from `WTSVirtualChannelQuery`. + unsafe fn new(ptr: *mut core::ffi::c_void) -> Self { + Self(ptr) + } + + fn as_handle(&self) -> HANDLE { + if self.0.is_null() { + return HANDLE::default(); + } + + // SAFETY: `self.0` is always a valid pointer to a handle if constructed properly, + // therefore it is safe to dereference it. + HANDLE(unsafe { *(self.0 as *mut *mut std::ffi::c_void) }) + } +} + +impl Drop for WTSMemory { + fn drop(&mut self) { + if self.0.is_null() { + return; + } + + // SAFETY: No preconditions. + unsafe { WTSFreeMemory(self.0) } + } +} + +impl Default for WTSMemory { + fn default() -> Self { + Self(std::ptr::null_mut()) + } +} diff --git a/devolutions-agent/src/updater/package.rs b/devolutions-agent/src/updater/package.rs index 07aa7e8dd..1b54067db 100644 --- a/devolutions-agent/src/updater/package.rs +++ b/devolutions-agent/src/updater/package.rs @@ -88,7 +88,7 @@ fn ensure_enough_rights() -> Result<(), UpdaterError> { *token_handle, TokenElevation, Some(&mut token_elevation as *mut _ as _), - std::mem::size_of::().try_into().unwrap(), + size_of::().try_into().unwrap(), &mut return_size as _, ) }; diff --git a/devolutions-gateway/Cargo.toml b/devolutions-gateway/Cargo.toml index fd7e559b0..e62a2b596 100644 --- a/devolutions-gateway/Cargo.toml +++ b/devolutions-gateway/Cargo.toml @@ -24,8 +24,9 @@ jmux-proxy = { path = "../crates/jmux-proxy" } devolutions-agent-shared = { path = "../crates/devolutions-agent-shared" } devolutions-gateway-task = { path = "../crates/devolutions-gateway-task" } devolutions-log = { path = "../crates/devolutions-log" } -ironrdp-pdu = { version = "0.1", git = "https://github.com/Devolutions/IronRDP", rev = "2e1a9ac88e38e7d92d893007bc25d0a05c365861", features = ["std"] } -ironrdp-rdcleanpath = { version = "0.1", git = "https://github.com/Devolutions/IronRDP", rev = "2e1a9ac88e38e7d92d893007bc25d0a05c365861" } +ironrdp-pdu = { version = "0.1", git = "https://github.com/Devolutions/IronRDP", rev = "7c268d863048d0a9182b3f7bf778668de8db4ccf", features = ["std"] } +ironrdp-core = { version = "0.1", git = "https://github.com/Devolutions/IronRDP", rev = "7c268d863048d0a9182b3f7bf778668de8db4ccf", features = ["std"] } +ironrdp-rdcleanpath = { version = "0.1", git = "https://github.com/Devolutions/IronRDP", rev = "7c268d863048d0a9182b3f7bf778668de8db4ccf" } ceviche = "0.6.1" picky-krb = "0.9" network-scanner = { version = "0.0.0", path = "../crates/network-scanner" } diff --git a/devolutions-gateway/src/api/fwd.rs b/devolutions-gateway/src/api/fwd.rs index ce4f3300c..39bc1fda2 100644 --- a/devolutions-gateway/src/api/fwd.rs +++ b/devolutions-gateway/src/api/fwd.rs @@ -263,8 +263,7 @@ async fn fwd_http( mut request: axum::http::Request, ) -> Result { use axum::extract::FromRequestParts as _; // from_request_parts - use axum::http::header; - use axum::http::Response; + use axum::http::{header, Response}; use core::str::FromStr; use http_body_util::BodyExt as _; // into_data_stream use std::sync::LazyLock; diff --git a/devolutions-gateway/src/rdp_pcb.rs b/devolutions-gateway/src/rdp_pcb.rs index 370ce619d..da0627128 100644 --- a/devolutions-gateway/src/rdp_pcb.rs +++ b/devolutions-gateway/src/rdp_pcb.rs @@ -51,11 +51,11 @@ pub fn extract_association_claims( } fn decode_pcb(buf: &[u8]) -> Result, io::Error> { - let mut cursor = ironrdp_pdu::cursor::ReadCursor::new(buf); + let mut cursor = ironrdp_core::ReadCursor::new(buf); - match ironrdp_pdu::decode_cursor::(&mut cursor) { + match ironrdp_core::decode_cursor::(&mut cursor) { Ok(pcb) => { - let pdu_size = ironrdp_pdu::size(&pcb); + let pdu_size = ironrdp_core::size(&pcb); let read_len = cursor.pos(); // NOTE: sanity check (reporting the wrong number will corrupt the communication) @@ -68,7 +68,7 @@ fn decode_pcb(buf: &[u8]) -> Result, io::Erro Ok(Some((pcb, read_len))) } - Err(e) if matches!(e.kind, ironrdp_pdu::PduErrorKind::NotEnoughBytes { .. }) => Ok(None), + Err(e) if matches!(e.kind, ironrdp_core::DecodeErrorKind::NotEnoughBytes { .. }) => Ok(None), Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)), } } diff --git a/devolutions-session/Cargo.toml b/devolutions-session/Cargo.toml index 6109d5252..c05b07623 100644 --- a/devolutions-session/Cargo.toml +++ b/devolutions-session/Cargo.toml @@ -8,31 +8,53 @@ description = "Session host application for Devolutions Agent" build = "build.rs" publish = false +[lints] +workspace = true + +[features] +default = ["dvc"] +dvc = [ + "dep:async-trait", + "dep:windows", + "dep:now-proto-pdu", + "dep:ironrdp", + "dep:tempfile", + "dep:thiserror", + "dep:win-api-wrappers", +] + [dependencies] anyhow = "1.0" camino = { version = "1.1", features = ["serde1"] } cfg-if = "1" ctrlc = "3.4" devolutions-log = { path = "../crates/devolutions-log" } +devolutions-gateway-task = { path = "../crates/devolutions-gateway-task" } parking_lot = "0.12" serde = "1" serde_json = "1" tap = "1.0" tracing = "0.1" +futures = "0.3" +tokio = { version = "1", features = ["macros", "sync"] } -[lints] -workspace = true +async-trait = { version = "0.1", optional = true } +now-proto-pdu = { version = "0.1", git = "https://github.com/Devolutions/IronRDP", rev = "7c268d863048d0a9182b3f7bf778668de8db4ccf", optional = true} +ironrdp = { version = "0.5", git = "https://github.com/Devolutions/IronRDP", rev = "7c268d863048d0a9182b3f7bf778668de8db4ccf", features = ["core", "pdu"], optional = true } +tempfile = { version = "3", optional = true } +thiserror = { version = "1", optional = true } +win-api-wrappers = { path = "../crates/win-api-wrappers", optional = true } [target.'cfg(windows)'.build-dependencies] embed-resource = "2.4" [target.'cfg(windows)'.dependencies.windows] version = "0.58" +optional = true features = [ "Win32_Foundation", - "Win32_Storage_FileSystem", - "Win32_System_RemoteDesktop", - "Win32_System_IO", - "Win32_System_Threading", - "Win32_Security", + "Win32_System_Shutdown", + "Win32_UI_WindowsAndMessaging", + "Win32_UI_Shell", + "Win32_System_Console", ] \ No newline at end of file diff --git a/devolutions-session/src/config.rs b/devolutions-session/src/config.rs index 3e9623139..78c620d54 100644 --- a/devolutions-session/src/config.rs +++ b/devolutions-session/src/config.rs @@ -95,7 +95,7 @@ fn save_config(conf: &dto::ConfFile) -> anyhow::Result<()> { } fn get_conf_file_path() -> Utf8PathBuf { - get_data_dir().join("agent.json") + get_data_dir().join("session.json") } fn normalize_data_path(path: &Utf8Path, data_dir: &Utf8Path) -> Utf8PathBuf { diff --git a/devolutions-session/src/dvc.rs b/devolutions-session/src/dvc.rs deleted file mode 100644 index 09e050aee..000000000 --- a/devolutions-session/src/dvc.rs +++ /dev/null @@ -1,148 +0,0 @@ -//! WIP: This file is copied from MSRDPEX project - -use windows::core::PCSTR; -use windows::Win32::Foundation::{ - DuplicateHandle, GetLastError, DUPLICATE_SAME_ACCESS, ERROR_IO_PENDING, HANDLE, WIN32_ERROR, -}; -use windows::Win32::Storage::FileSystem::{ReadFile, WriteFile}; -use windows::Win32::System::RemoteDesktop::{ - WTSFreeMemory, WTSVirtualChannelClose, WTSVirtualChannelOpenEx, WTSVirtualChannelQuery, WTSVirtualFileHandle, - CHANNEL_FLAG_LAST, CHANNEL_PDU_HEADER, WTS_CHANNEL_OPTION_DYNAMIC, WTS_CURRENT_SESSION, WTS_VIRTUAL_CLASS, -}; -use windows::Win32::System::Threading::{CreateEventW, GetCurrentProcess, WaitForSingleObject, INFINITE}; -use windows::Win32::System::IO::{GetOverlappedResult, OVERLAPPED}; - -const CHANNEL_PDU_LENGTH: usize = 1024; - -pub fn loop_dvc() { - info!("Starting DVC loop"); - - match open_virtual_channel("DvcSample") { - Ok(h_file) => { - info!("Virtual channel opened"); - - if let Err(error) = handle_virtual_channel(h_file) { - error!(%error, "DVC handling falied"); - } - } - Err(error) => { - error!(%error, "Failed to open virtual channel"); - // NOTE: Not exiting the program here, as it is not the main functionality - } - } - - info!("DVC loop finished"); -} - -#[allow(clippy::multiple_unsafe_ops_per_block)] -#[allow(clippy::undocumented_unsafe_blocks)] -fn open_virtual_channel(channel_name: &str) -> windows::core::Result { - unsafe { - let channel_name_wide = PCSTR::from_raw(channel_name.as_ptr()); - let h_wts_handle = WTSVirtualChannelOpenEx(WTS_CURRENT_SESSION, channel_name_wide, WTS_CHANNEL_OPTION_DYNAMIC) - .map_err(|e| std::io::Error::from_raw_os_error(e.code().0))?; - - let mut vc_file_handle_ptr: *mut HANDLE = std::ptr::null_mut(); - let mut len: u32 = 0; - let wts_virtual_class: WTS_VIRTUAL_CLASS = WTSVirtualFileHandle; - WTSVirtualChannelQuery( - h_wts_handle, - wts_virtual_class, - &mut vc_file_handle_ptr as *mut _ as *mut _, - &mut len, - ) - .map_err(|e| std::io::Error::from_raw_os_error(e.code().0))?; - - let mut new_handle: HANDLE = HANDLE::default(); - let _duplicate_result = DuplicateHandle( - GetCurrentProcess(), - *vc_file_handle_ptr, - GetCurrentProcess(), - &mut new_handle, - 0, - false, - DUPLICATE_SAME_ACCESS, - ); - - WTSFreeMemory(vc_file_handle_ptr as *mut core::ffi::c_void); - let _ = WTSVirtualChannelClose(h_wts_handle); - - Ok(new_handle) - } -} - -#[allow(clippy::multiple_unsafe_ops_per_block)] -#[allow(clippy::undocumented_unsafe_blocks)] -fn write_virtual_channel_message(h_file: HANDLE, cb_size: u32, buffer: *const u8) -> windows::core::Result<()> { - unsafe { - let buffer_slice = std::slice::from_raw_parts(buffer, cb_size as usize); - let mut dw_written: u32 = 0; - WriteFile(h_file, Some(buffer_slice), Some(&mut dw_written), None) - } -} - -#[allow(clippy::cast_possible_wrap)] -#[allow(clippy::cast_ptr_alignment)] -#[allow(clippy::ptr_offset_with_cast)] -#[allow(clippy::cast_possible_truncation)] -#[allow(clippy::multiple_unsafe_ops_per_block)] -#[allow(clippy::undocumented_unsafe_blocks)] -fn handle_virtual_channel(h_file: HANDLE) -> windows::core::Result<()> { - unsafe { - let mut read_buffer = [0u8; CHANNEL_PDU_LENGTH]; - let mut overlapped = OVERLAPPED::default(); - let mut dw_read: u32 = 0; - - let cmd = "whoami\0"; - let cb_size = cmd.len() as u32; - write_virtual_channel_message(h_file, cb_size, cmd.as_ptr())?; - - let h_event = CreateEventW(None, false, false, None)?; - overlapped.hEvent = h_event; - - loop { - // Notice the wrapping of parameters in Some() - let result = ReadFile( - h_file, - Some(&mut read_buffer), - Some(&mut dw_read), - Some(&mut overlapped), - ); - - if let Err(e) = result { - if GetLastError() == WIN32_ERROR(ERROR_IO_PENDING.0) { - let _dw_status = WaitForSingleObject(h_event, INFINITE); - if GetOverlappedResult(h_file, &overlapped, &mut dw_read, false).is_err() { - return Err(windows::core::Error::from_win32()); - } - } else { - return Err(e); - } - } - - info!("read {} bytes", dw_read); - - let packet_size = dw_read as usize - std::mem::size_of::(); - let p_data = read_buffer - .as_ptr() - .offset(std::mem::size_of::() as isize); - - info!( - ">> {}", - std::str::from_utf8(std::slice::from_raw_parts(p_data, packet_size)).unwrap_or("Invalid UTF-8") - ); - - if dw_read == 0 - || ((*(p_data.offset(-(std::mem::size_of::() as isize)) - as *const CHANNEL_PDU_HEADER)) - .flags - & CHANNEL_FLAG_LAST) - != 0 - { - break; - } - } - - Ok(()) - } -} diff --git a/devolutions-session/src/dvc/channel.rs b/devolutions-session/src/dvc/channel.rs new file mode 100644 index 000000000..5dc8936e8 --- /dev/null +++ b/devolutions-session/src/dvc/channel.rs @@ -0,0 +1,74 @@ +use std::fmt::Debug; + +use tokio::sync::mpsc::{self, Receiver, Sender}; +use win_api_wrappers::event::Event; +use windows::Win32::Foundation::HANDLE; + +const IO_CHANNEL_SIZE: usize = 100; + +/// Mpsc channel with WinAPI event signaling. +#[derive(Debug, Clone)] +pub struct WinapiSignaledSender { + tx: Sender, + event: Event, +} + +impl WinapiSignaledSender { + pub async fn send(&self, message: T) -> anyhow::Result<()> { + self.tx.send(message).await?; + + // DVC IO loop is controlled by WinAPI events signaling, therefore we need to fire event to + // notify DVC IO loop about new incoming message. + + self.event.set()?; + + Ok(()) + } + + pub fn blocking_send(&self, message: T) -> anyhow::Result<()> { + self.tx.blocking_send(message)?; + + self.event.set()?; + + Ok(()) + } +} + +pub struct WinapiSignaledReceiver { + rx: Receiver, + event: Event, +} + +impl WinapiSignaledReceiver { + pub fn try_recv(&mut self) -> anyhow::Result { + let value = self.rx.try_recv()?; + Ok(value) + } + + pub fn raw_event(&self) -> HANDLE { + self.event.raw() + } +} + +/// Creates WinAPI signaled mpsc channel. +pub fn winapi_signaled_mpsc_channel() -> anyhow::Result<(WinapiSignaledSender, WinapiSignaledReceiver)> { + // Create WinAPI event. + + let event = Event::new_unnamed()?; + + let (tx, rx) = mpsc::channel(IO_CHANNEL_SIZE); + + Ok(( + WinapiSignaledSender { + tx, + event: event.clone(), + }, + WinapiSignaledReceiver { rx, event }, + )) +} + +pub fn bounded_mpsc_channel() -> anyhow::Result<(Sender, Receiver)> { + let (tx, rx) = mpsc::channel(IO_CHANNEL_SIZE); + + Ok((tx, rx)) +} diff --git a/devolutions-session/src/dvc/fs.rs b/devolutions-session/src/dvc/fs.rs new file mode 100644 index 000000000..fafd86bcd --- /dev/null +++ b/devolutions-session/src/dvc/fs.rs @@ -0,0 +1,50 @@ +use std::path::PathBuf; + +use windows::core::PCWSTR; +use windows::Win32::Storage::FileSystem::{MoveFileExW, MOVEFILE_DELAY_UNTIL_REBOOT}; + +use win_api_wrappers::utils::WideString; + +/// Guard for created temporary file. Associated file is deleted on drop. +pub struct TmpFileGuard(PathBuf); + +impl TmpFileGuard { + pub fn new(extension: &str) -> anyhow::Result { + let (_file, path) = tempfile::Builder::new() + .prefix("devolutions-") + .suffix(&format!(".{}", extension)) + .tempfile()? + .keep()?; + + Ok(Self(path)) + } + + pub fn write_content(&self, content: &str) -> anyhow::Result<()> { + std::fs::write(&self.0, content)?; + + let wide_name = WideString::from(self.0.as_os_str()); + + // Remove file on reboot. + // SAFETY: File path is valid, therefore it is safe to call. + unsafe { MoveFileExW(wide_name.as_pcwstr(), PCWSTR::null(), MOVEFILE_DELAY_UNTIL_REBOOT) }?; + + Ok(()) + } + + pub fn path(&self) -> &PathBuf { + &self.0 + } + + pub fn path_string(&self) -> String { + format!("{}", self.0.display()) + } +} + +impl Drop for TmpFileGuard { + fn drop(&mut self) { + if let Err(error) = std::fs::remove_file(&self.0) { + let path = format!("{}", self.0.display()); + error!(%error, path, "Failed to remove temporary file"); + } + } +} diff --git a/devolutions-session/src/dvc/io.rs b/devolutions-session/src/dvc/io.rs new file mode 100644 index 000000000..b46d3d474 --- /dev/null +++ b/devolutions-session/src/dvc/io.rs @@ -0,0 +1,178 @@ +use tokio::sync::mpsc::error::TrySendError; +use tokio::sync::mpsc::Sender; +use windows::Win32::Foundation::{GetLastError, ERROR_IO_PENDING, WAIT_EVENT, WAIT_OBJECT_0}; +use windows::Win32::Storage::FileSystem::{ReadFile, WriteFile}; +use windows::Win32::System::RemoteDesktop::{CHANNEL_CHUNK_LENGTH, CHANNEL_PDU_HEADER}; +use windows::Win32::System::Threading::{WaitForMultipleObjects, INFINITE}; +use windows::Win32::System::IO::{GetOverlappedResult, OVERLAPPED}; + +use now_proto_pdu::NowMessage; +use win_api_wrappers::event::Event; +use win_api_wrappers::utils::Pipe; +use win_api_wrappers::wts::WTSVirtualChannel; + +use crate::dvc::channel::WinapiSignaledReceiver; +use crate::dvc::now_message_dissector::NowMessageDissector; + +const DVC_CHANNEL_NAME: &str = "Devolutions::Now::Agent"; + +/// Run main DVC IO loop for `Devolutions::Now::Agent` channel. +pub fn run_dvc_io( + mut write_rx: WinapiSignaledReceiver, + read_tx: Sender, + stop_event: Event, +) -> Result<(), anyhow::Error> { + trace!("Opening DVC channel"); + let wts = WTSVirtualChannel::open_dvc(DVC_CHANNEL_NAME)?; + + trace!("Querying DVC channel"); + let channel_file = wts.query_file_handle()?; + + trace!("DVC channel opened"); + + let mut pdu_chunk_buffer = [0u8; CHANNEL_CHUNK_LENGTH as usize]; + let mut overlapped = OVERLAPPED::default(); + let mut bytes_read: u32 = 0; + + let mut message_dissector = NowMessageDissector::default(); + + let read_event = Event::new_unnamed()?; + overlapped.hEvent = read_event.raw(); + + info!("DVC IO thread is running"); + + // Prepare async read operation. + // SAFETY: Both `channel_file` and event passed to `overlapped` are valid during this call, + // therefore it is safe to call. + let read_result: Result<(), windows::core::Error> = + unsafe { ReadFile(*channel_file, Some(&mut pdu_chunk_buffer), None, Some(&mut overlapped)) }; + + ensure_overlapped_io_result(read_result)?; + + loop { + let events = [read_event.raw(), write_rx.raw_event(), stop_event.raw()]; + + const WAIT_OBJECT_READ_DVC: WAIT_EVENT = WAIT_OBJECT_0; + const WAIT_OBJECT_WRITE_DVC: WAIT_EVENT = WAIT_EVENT(WAIT_OBJECT_0.0 + 1); + const WAIT_OBJECT_STOP: WAIT_EVENT = WAIT_EVENT(WAIT_OBJECT_0.0 + 2); + + // SAFETY: No preconditions. + let wait_status = unsafe { WaitForMultipleObjects(&events, false, INFINITE) }; + + match wait_status { + // Read event is signaled (incoming data from DVC channel). + WAIT_OBJECT_READ_DVC => { + trace!("DVC channel read event is signaled"); + + // SAFETY: No preconditions. + unsafe { GetOverlappedResult(*channel_file, &overlapped, &mut bytes_read, false) }?; + + if bytes_read + < u32::try_from(size_of::()) + .expect("BUG CHANNEL_PDU_HEADER size always fits into u32") + { + // Channel is closed abruptly; abort loop. + return Ok(()); + } + + let chunk_data_size = usize::try_from(bytes_read) + .expect( + "BUG: Read size can't be breater than CHANNEL_CHUNK_LENGTH, therefore it should fit into usize", + ) + .checked_sub(size_of::()) + .expect("BUG: Read size is less than header size; Correctness of this should be ensured by the OS"); + + const HEADER_SIZE: usize = size_of::(); + + let messages = message_dissector + .dissect(&pdu_chunk_buffer[HEADER_SIZE..HEADER_SIZE + chunk_data_size]) + .expect("BUG: Failed to dissect messages"); + + // Send all messages over the channel. + for message in messages { + debug!(?message, "DVC message received"); + // We do non-blocking send to avoid blocking the IO thread. Processing + // task is expected to be fast enough to keep up with the incoming messages. + match read_tx.try_send(message) { + Ok(_) => { + trace!("Received DVC message is sent to the processing channel"); + } + Err(TrySendError::Full(_)) => { + trace!("DVC message is dropped due to busy channel"); + } + Err(e) => { + trace!("DVC message is dropped due to closed channel"); + return Err(e.into()); + } + } + } + + // Prepare async read file operation one more time. + // SAFETY: No preconditions. + let result = + unsafe { ReadFile(*channel_file, Some(&mut pdu_chunk_buffer), None, Some(&mut overlapped)) }; + + ensure_overlapped_io_result(result)?; + } + // Write event is signaled (outgoing data to DVC channel). + WAIT_OBJECT_WRITE_DVC => { + trace!("DVC channel write event is signaled"); + + let message_to_write = write_rx.try_recv()?; + let message_bytes = ironrdp::core::encode_vec(&message_to_write)?; + + let mut dw_written: u32 = 0; + + // SAFETY: No preconditions. + unsafe { WriteFile(*channel_file, Some(&message_bytes), Some(&mut dw_written), None)? } + } + WAIT_OBJECT_STOP => { + info!("DVC IO thread is stopped"); + // Stop event is signaled; abort loop. + return Ok(()); + } + _ => { + // Spurious wakeup or wait failure + } + }; + } +} + +pub fn ensure_overlapped_io_result(result: windows::core::Result<()>) -> Result<(), windows::core::Error> { + if let Err(error) = result { + // SAFETY: GetLastError is alwayŃ– safe to call + if unsafe { GetLastError() } != ERROR_IO_PENDING { + return Err(error); + } + } + + Ok(()) +} + +pub struct IoRedirectionPipes { + pub stdout_read_pipe: Pipe, + pub stdout_write_pipe: Pipe, + + pub stderr_read_pipe: Pipe, + pub stderr_write_pipe: Pipe, + + pub stdin_read_pipe: Pipe, + pub stdin_write_pipe: Pipe, +} + +impl IoRedirectionPipes { + pub fn new() -> anyhow::Result { + let (stdout_read_pipe, stdout_write_pipe) = Pipe::new_async_stdout_redirection_pipe()?; + let (stderr_read_pipe, stderr_write_pipe) = Pipe::new_async_stdout_redirection_pipe()?; + let (stdin_read_pipe, stdin_write_pipe) = Pipe::new_sync_stdin_redirection_pipe()?; + + Ok(IoRedirectionPipes { + stdout_read_pipe, + stdout_write_pipe, + stderr_read_pipe, + stderr_write_pipe, + stdin_read_pipe, + stdin_write_pipe, + }) + } +} diff --git a/devolutions-session/src/dvc/mod.rs b/devolutions-session/src/dvc/mod.rs new file mode 100644 index 000000000..29122d4cb --- /dev/null +++ b/devolutions-session/src/dvc/mod.rs @@ -0,0 +1,9 @@ +//! This module contains the DVC server implementation. + +pub mod channel; +pub mod fs; +pub mod io; +pub mod now_message_dissector; +pub mod process; +pub mod status; +pub mod task; diff --git a/devolutions-session/src/dvc/now_message_dissector.rs b/devolutions-session/src/dvc/now_message_dissector.rs new file mode 100644 index 000000000..22b15de65 --- /dev/null +++ b/devolutions-session/src/dvc/now_message_dissector.rs @@ -0,0 +1,85 @@ +use ironrdp::core::{Decode, ReadCursor}; +use now_proto_pdu::{NowHeader, NowMessage}; + +enum NowFrameState { + Header, + Body(NowHeader), +} + +/// Reconstructs Now messages from a stream of bytes. +pub struct NowMessageDissector { + pdu_body_buffer: Vec, + read_state: NowFrameState, +} + +impl Default for NowMessageDissector { + fn default() -> Self { + Self { + pdu_body_buffer: Vec::new(), + read_state: NowFrameState::Header, + } + } +} + +impl NowMessageDissector { + pub fn dissect(&mut self, data_chunk: &[u8]) -> Result, anyhow::Error> { + let mut messages = Vec::new(); + + self.pdu_body_buffer.extend_from_slice(data_chunk); + + 'dissect_messages: loop { + match &self.read_state { + NowFrameState::Header if self.pdu_body_buffer.len() < NowHeader::FIXED_PART_SIZE => { + // More data needed to read header + break 'dissect_messages; + } + NowFrameState::Header => { + // We have enough data in the buffer to read the header + let mut data_chunk = ReadCursor::new(&self.pdu_body_buffer); + let header = NowHeader::decode(&mut data_chunk).expect("Failed to read message header"); + + let is_empty_chunk = header.size == 0; + self.read_state = NowFrameState::Body(header); + + if data_chunk.remaining().is_empty() && !is_empty_chunk { + // Need more data to read the body + break 'dissect_messages; + } + + continue 'dissect_messages; + } + NowFrameState::Body(header) => { + let expected_message_size = header.size as usize + NowHeader::FIXED_PART_SIZE; + + if self.pdu_body_buffer.len() < expected_message_size { + // More data needed to read the body + break 'dissect_messages; + } + + // We have enough data in the buffer to read the body + let data_remaining = { + let mut cursor = ReadCursor::new(&self.pdu_body_buffer); + let message = NowMessage::decode(&mut cursor).expect("Failed to read message"); + messages.push(message); + + !cursor.remaining().is_empty() + }; + + // Remove the processed message from the buffer. + // We need to continue process the chunk in case there are more messages in it. + self.pdu_body_buffer.drain(0..expected_message_size); + + self.read_state = NowFrameState::Header; + + // No remaining data left + if !data_remaining { + // No more data to process + break 'dissect_messages; + } + } + } + } + + Ok(messages) + } +} diff --git a/devolutions-session/src/dvc/process.rs b/devolutions-session/src/dvc/process.rs new file mode 100644 index 000000000..111288d61 --- /dev/null +++ b/devolutions-session/src/dvc/process.rs @@ -0,0 +1,713 @@ +use tokio::sync::mpsc::Sender; +use windows::core::PCWSTR; +use windows::Win32::Foundation::{ + CloseHandle, GetLastError, BOOL, ERROR_BROKEN_PIPE, ERROR_HANDLE_EOF, HANDLE, HWND, LPARAM, WAIT_EVENT, + WAIT_OBJECT_0, WPARAM, +}; +use windows::Win32::Storage::FileSystem::{ReadFile, WriteFile}; +use windows::Win32::System::Console::{ + AttachConsole, FreeConsole, GenerateConsoleCtrlEvent, SetConsoleCtrlHandler, CTRL_C_EVENT, +}; +use windows::Win32::System::Threading::{ + CreateProcessW, GetExitCodeProcess, GetProcessHandleFromHwnd, TerminateProcess, WaitForMultipleObjects, INFINITE, + PROCESS_INFORMATION, STARTF_USESTDHANDLES, STARTUPINFOW, +}; +use windows::Win32::System::IO::{GetOverlappedResult, OVERLAPPED}; +use windows::Win32::UI::WindowsAndMessaging::{EnumWindows, PostMessageW, WM_CLOSE}; + +use now_proto_pdu::{ + NowExecCancelRspMsg, NowExecDataFlags, NowExecDataMsg, NowExecResultMsg, NowMessage, NowSeverity, NowStatus, + NowStatusCode, NowVarBuf, +}; +use win_api_wrappers::event::Event; +use win_api_wrappers::handle::Handle; +use win_api_wrappers::process::Process; +use win_api_wrappers::security::acl::{RawSecurityAttributes, SecurityAttributes}; +use win_api_wrappers::utils::{Pipe, WideString}; + +use crate::dvc::channel::{winapi_signaled_mpsc_channel, WinapiSignaledReceiver, WinapiSignaledSender}; +use crate::dvc::fs::TmpFileGuard; +use crate::dvc::io::{ensure_overlapped_io_result, IoRedirectionPipes}; +use crate::dvc::status::{ExecAgentError, ExecResultKind}; + +#[derive(Debug, thiserror::Error)] +pub enum ExecError { + #[error("Execution was aborted by user")] + Aborted, + #[error(transparent)] + Windows(#[from] windows::core::Error), + #[error("Execution failed with agent error: {}", .0.0)] + Agent(ExecAgentError), + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +impl From for ExecError { + fn from(error: ExecAgentError) -> Self { + ExecError::Agent(error) + } +} + +#[derive(Debug)] +pub enum ProcessIoInputEvent { + AbortExecution(NowStatus), + CancelExecution, + DataIn { + data: Vec, + /// If last is set, then stdin pipe should be closed. Any consecutive DataIn messages + /// will be ignored. + last: bool, + }, + TerminateIo, +} + +/// Message, sent from Process IO thread to task to finalize process execution. +#[derive(Debug)] +pub enum ProcessIoNotification { + Terminated { session_id: u32 }, +} + +pub struct WinApiProcessCtx { + session_id: u32, + + dvc_tx: WinapiSignaledSender, + input_event_rx: WinapiSignaledReceiver, + + stdout_read_pipe: Option, + stderr_read_pipe: Option, + stdin_write_pipe: Option, + + // NOTE: Order of fields is important, as process_handle must be dropped last in automatically + // generated destructor, after all pipes were closed. + process: Process, +} + +impl WinApiProcessCtx { + // Returns process exit code. + pub fn start_io_loop(&mut self) -> Result { + let session_id = self.session_id; + + info!(session_id, "Process IO loop has started"); + + // Events for overlapped IO + let stdout_read_event = Event::new_unnamed()?; + let stderr_read_event = Event::new_unnamed()?; + + let wait_events = vec![ + stdout_read_event.raw(), + stderr_read_event.raw(), + self.input_event_rx.raw_event(), + self.process.handle.raw(), + ]; + + const WAIT_OBJECT_STDOUT_READ: WAIT_EVENT = WAIT_OBJECT_0; + const WAIT_OBJECT_STDERR_READ: WAIT_EVENT = WAIT_EVENT(WAIT_OBJECT_0.0 + 1); + const WAIT_OBJECT_INPUT_MESSAGE: WAIT_EVENT = WAIT_EVENT(WAIT_OBJECT_0.0 + 2); + const WAIT_OBJECT_PROCESS_EXIT: WAIT_EVENT = WAIT_EVENT(WAIT_OBJECT_0.0 + 3); + + // Initiate first overlapped read round + + let mut stdout_buffer = vec![0u8; 4 * 1024]; + let mut stderr_buffer = vec![0u8; 4 * 1024]; + + let mut stdout_first_chunk = true; + let mut stderr_first_chunk = true; + + let mut overlapped_stdout = OVERLAPPED { + hEvent: stdout_read_event.raw(), + ..Default::default() + }; + + let mut overlapped_stderr = OVERLAPPED { + hEvent: stderr_read_event.raw(), + ..Default::default() + }; + + // SAFETY: pipe is valid to read from, as long as it is not closed. + let stdout_read_result = unsafe { + ReadFile( + self.stdout_read_pipe + .as_ref() + .map(|pipe| pipe.handle.raw()) + .expect("BUG: stdout pipe is not initialized"), + Some(&mut stdout_buffer[..]), + None, + Some(&mut overlapped_stdout as *mut _), + ) + }; + + ensure_overlapped_io_result(stdout_read_result)?; + + // SAFETY: pipe is valid to read from, as long as it is not closed. + let stderr_read_result = unsafe { + ReadFile( + self.stderr_read_pipe + .as_ref() + .map(|pipe| pipe.handle.raw()) + .expect("BUG: stderr pipe is not initialized"), + Some(&mut stderr_buffer[..]), + None, + Some(&mut overlapped_stderr as *mut _), + ) + }; + + ensure_overlapped_io_result(stderr_read_result)?; + + info!(session_id, "Process IO is ready for async loop execution"); + loop { + // SAFETY: No preconditions. + let signaled_event = unsafe { WaitForMultipleObjects(&wait_events, false, INFINITE) }; + + match signaled_event { + WAIT_OBJECT_PROCESS_EXIT => { + info!(session_id, "Process has signaled exit"); + + // Restore Ctrl+C handler for current process, in case it was disabled by + // CancelExecution event. + + // SAFETY: No preconditions. + unsafe { SetConsoleCtrlHandler(None, false)? }; + + let mut code: u32 = 0; + // SAFETY: process_handle is valid and `code` is a valid stack memory, therefore + // it is safe to call GetExitCodeProcess. + unsafe { + GetExitCodeProcess(self.process.handle.raw(), &mut code as *mut _)?; + } + + return Ok(u16::try_from(code).unwrap_or(0xFFFF)); + } + WAIT_OBJECT_INPUT_MESSAGE => { + let process_io_message = self.input_event_rx.try_recv()?; + + trace!(session_id, ?process_io_message, "Received process IO message"); + + match process_io_message { + ProcessIoInputEvent::AbortExecution(status) => { + info!(session_id, "Aborting process execution by user request"); + + // Terminate process with requested status. + // SAFETY: No preconditions. + unsafe { TerminateProcess(self.process.handle.raw(), status.code().0.into())? }; + + return Err(ExecError::Aborted); + } + ProcessIoInputEvent::CancelExecution => { + info!(session_id, "Cancelling process execution by user request"); + + let mut windows_enum_ctx = EnumWindowsContext { + expected_process: self.process.handle.raw(), + windows: Vec::new(), + }; + + // SAFETY: EnumWindows is safe to call with valid callback function + // and context. Lifetime of windows_enum_ctx is guaranteed to be valid + // until EnumWindows returns. + unsafe { + // Enumerate all windows associated with the process + EnumWindows( + Some(windows_enum_func), + LPARAM(&mut windows_enum_ctx as *mut EnumWindowsContext as isize), + ) + }?; + + // For GUI windows - send WM_CLOSE message + if !windows_enum_ctx.windows.is_empty() { + // Send cancel message to all windows + for hwnd in windows_enum_ctx.windows { + // SAFETY: No preconditions. + let _ = unsafe { PostMessageW(hwnd, WM_CLOSE, WPARAM(0), LPARAM(0)) }; + } + + // Acknowledge client that cancel request was sent. + self.dvc_tx.blocking_send( + NowExecCancelRspMsg::new( + session_id, + NowStatus::new(NowSeverity::Info, NowStatusCode::SUCCESS), + ) + .into(), + )?; + + // Wait for process to exit + continue; + } + + // For console applications - send CTRL+C + // SAFETY: No preconditions. + + let pid = self.process.pid(); + + // SAFETY: No preconditions. + if pid != 0 && unsafe { AttachConsole(pid) }.is_ok() { + // Disable Ctrl+C handler for current process. Will be re-enabled, + // when process exits (see WAIT_OBJECT_PROCESS_EXIT above). + // SAFETY: No preconditions. + unsafe { SetConsoleCtrlHandler(None, true)? }; + + // Send Ctrl+C to console application + // SAFETY: No preconditions. + unsafe { GenerateConsoleCtrlEvent(CTRL_C_EVENT, 0)? }; + + // Detach from console + // SAFETY: No preconditions. + unsafe { FreeConsole()? }; + + // Acknowledge client that cancel request was sent successfully. + self.dvc_tx.blocking_send( + NowExecCancelRspMsg::new( + session_id, + NowStatus::new(NowSeverity::Info, NowStatusCode::SUCCESS) + .with_kind(ExecResultKind::SESSION_ERROR_AGENT.0) + .expect("BUG: Status kind is out of range"), + ) + .into(), + )?; + + // wait for process to exit + continue; + } + + // Neither GUI nor console application, send cancel response with error + self.dvc_tx.blocking_send( + NowExecCancelRspMsg::new( + session_id, + NowStatus::new(NowSeverity::Error, NowStatusCode::FAILURE) + .with_kind(ExecResultKind::SESSION_ERROR_AGENT.0) + .expect("BUG: Status kind is out of range"), + ) + .into(), + )?; + } + ProcessIoInputEvent::DataIn { data, last } => { + trace!(session_id, "Received data to write to stdin pipe"); + + let pipe_handle = match self.stdin_write_pipe.as_ref() { + Some(pipe) => pipe.handle.raw(), + None => { + // Ignore data, as stdin pipe was already closed. + continue; + } + }; + + let mut bytes_written: u32 = 0; + + // Send data to stdin pipe in a blocking maner. + // SAFETY: pipe is valid to write to, as long as it is not closed. + unsafe { WriteFile(pipe_handle, Some(&data), Some(&mut bytes_written as *mut _), None) }?; + + if last { + // Close stdin pipe + self.stdin_write_pipe = None; + } + } + ProcessIoInputEvent::TerminateIo => { + info!(session_id, "Terminating IO loop"); + + // Terminate IO loop + return Err(ExecError::Aborted); + } + } + } + WAIT_OBJECT_STDOUT_READ => { + trace!(session_id, "Received data from stdout pipe"); + + let pipe_handle = if let Some(pipe) = self.stdout_read_pipe.as_ref() { + pipe.handle.raw() + } else { + // Ignore data, as stdout pipe was already closed. + continue; + }; + + let flags = if stdout_first_chunk { + stdout_first_chunk = false; + NowExecDataFlags::FIRST | NowExecDataFlags::STDOUT + } else { + NowExecDataFlags::STDOUT + }; + + let mut bytes_read: u32 = 0; + + // SAFETY: Destination buffer is valid during the lifetime of this function, + // thus it is safe to read into it. (buffer was implicitly borrowed by ReadFile) + let overlapped_result = unsafe { + GetOverlappedResult( + pipe_handle, + &overlapped_stdout as *const _, + &mut bytes_read as *mut _, + false, + ) + }; + + if let Err(err) = overlapped_result { + // SAFETY: No preconditions. + match unsafe { GetLastError() } { + ERROR_HANDLE_EOF | ERROR_BROKEN_PIPE => { + // EOF on stdout pipe, close it and send EOF message to message_tx + self.stdout_read_pipe = None; + let exec_message = NowExecDataMsg::new( + flags | NowExecDataFlags::LAST, + session_id, + NowVarBuf::new(Vec::new()) + .expect("BUG: empty buffer should always fit into NowVarBuf"), + ); + + self.dvc_tx.blocking_send(exec_message.into())?; + } + _code => return Err(err.into()), + } + continue; + } + + let data_message = NowExecDataMsg::new( + flags, + session_id, + NowVarBuf::new(stdout_buffer[..bytes_read as usize].to_vec()) + .expect("BUG: read buffer should always fit into NowVarBuf"), + ); + + self.dvc_tx.blocking_send(data_message.into())?; + + // Schedule next overlapped read + // SAFETY: pipe is valid to read from, as long as it is not closed. + let stdout_read_result = unsafe { + ReadFile( + pipe_handle, + Some(&mut stdout_buffer[..]), + None, + Some(&mut overlapped_stdout as *mut _), + ) + }; + + ensure_overlapped_io_result(stdout_read_result)?; + } + WAIT_OBJECT_STDERR_READ => { + trace!(session_id, "Received data from stderr pipe"); + + let pipe_handle = if let Some(pipe) = self.stderr_read_pipe.as_ref() { + pipe.handle.raw() + } else { + // Ignore data, as stderr pipe was already closed. + continue; + }; + + let flags = if stderr_first_chunk { + stderr_first_chunk = false; + NowExecDataFlags::FIRST | NowExecDataFlags::STDERR + } else { + NowExecDataFlags::STDERR + }; + + let mut bytes_read: u32 = 0; + + // SAFETY: Destination buffer is valid during the lifetime of this function, + // thus it is safe to read into it. (buffer was implicitly borrowed by ReadFile) + let overlapped_result = unsafe { + GetOverlappedResult( + pipe_handle, + &overlapped_stderr as *const _, + &mut bytes_read as *mut _, + false, + ) + }; + + if let Err(err) = overlapped_result { + // SAFETY: No_preconditions. + match unsafe { GetLastError() } { + ERROR_HANDLE_EOF | ERROR_BROKEN_PIPE => { + // EOF on stderr pipe, close it and send EOF message to message_tx + self.stderr_read_pipe = None; + let exec_message = NowExecDataMsg::new( + flags | NowExecDataFlags::LAST, + session_id, + NowVarBuf::new(Vec::new()) + .expect("BUG: empty buffer should always fit into NowVarBuf"), + ); + + self.dvc_tx.blocking_send(exec_message.into())?; + } + _code => return Err(err.into()), + } + continue; + } + + let data_message = NowExecDataMsg::new( + flags, + session_id, + NowVarBuf::new(stderr_buffer.as_slice()) + .expect("BUG: read buffer should always fit into NowVarBuf"), + ); + + self.dvc_tx.blocking_send(data_message.into())?; + + // Schedule next overlapped read + // SAFETY: pipe is valid to read from, as long as it is not closed. + let stderr_read_result = unsafe { + ReadFile( + pipe_handle, + Some(&mut stderr_buffer[..]), + None, + Some(&mut overlapped_stderr as *mut _), + ) + }; + + ensure_overlapped_io_result(stderr_read_result)?; + } + _ => { + // Unexpected event, spurious wakeup? + continue; + } + } + } + } +} + +/// Builder for process execution session. +pub struct WinApiProcessBuilder { + executable: String, + command_line: String, + current_directory: String, + temp_files: Vec, +} + +impl WinApiProcessBuilder { + pub fn new(executable: &str) -> Self { + Self { + executable: executable.to_string(), + command_line: String::new(), + current_directory: String::new(), + temp_files: Vec::new(), + } + } + + #[must_use] + pub fn with_temp_file(mut self, temp_file: TmpFileGuard) -> Self { + self.temp_files.push(temp_file); + self + } + + #[must_use] + pub fn with_command_line(mut self, command_line: String) -> Self { + self.command_line = command_line; + self + } + + #[must_use] + pub fn with_current_directory(mut self, current_directory: String) -> Self { + self.current_directory = current_directory; + self + } + + /// Starts process execution and spawns IO thread to redirect stdio to/from dvc. + pub fn run( + self, + session_id: u32, + dvc_tx: WinapiSignaledSender, + io_notification_tx: Sender, + ) -> anyhow::Result { + let command_line = format!("\"{}\" {}", self.executable, self.command_line) + .trim_end() + .to_string(); + let current_directory = self.current_directory.clone(); + + info!( + session_id, + "Starting process: `{command_line}`; cwd:`{current_directory}`" + ); + + let mut command_line_wide = WideString::from(command_line); + let current_directory_wide = if current_directory.is_empty() { + WideString::default() + } else { + WideString::from(current_directory) + }; + + let mut process_information = PROCESS_INFORMATION::default(); + + let IoRedirectionPipes { + stdout_read_pipe, + stdout_write_pipe, + stderr_read_pipe, + stderr_write_pipe, + stdin_read_pipe, + stdin_write_pipe, + } = IoRedirectionPipes::new()?; + + let startup_info = STARTUPINFOW { + cb: u32::try_from(size_of::()).expect("BUG: STARTUPINFOW should always fit into u32"), + dwFlags: STARTF_USESTDHANDLES, + hStdError: stderr_write_pipe.handle.raw(), + hStdInput: stdin_read_pipe.handle.raw(), + hStdOutput: stdout_write_pipe.handle.raw(), + ..Default::default() + }; + + let security_attributes = RawSecurityAttributes::try_from(&SecurityAttributes { + security_descriptor: None, + inherit_handle: true, + })?; + + // SAFETY: All parameters constructed above are valid and safe to use. + unsafe { + CreateProcessW( + PCWSTR::null(), + command_line_wide.as_pwstr(), + Some(security_attributes.as_raw() as *const _), + None, + true, + Default::default(), + None, + current_directory_wide.as_pcwstr(), + &startup_info as *const _, + &mut process_information as *mut _, + )?; + } + + // SAFETY: No preconditions. + unsafe { + // We don't need the thread handle, so we close it + CloseHandle(process_information.hThread) + }?; + + // Handles were duplicated by CreateProcessW, so we can close them immediately. + // Explicitly drop handles just for clarity. + drop(stdout_write_pipe); + drop(stderr_write_pipe); + drop(stdin_read_pipe); + + let process_handle = Process::from( + // SAFETY: process_information is valid and contains valid process handle. + unsafe { Handle::new_owned(process_information.hProcess)? }, + ); + + // Create channel for `task` -> `Process IO thread` communication + let (input_event_tx, input_event_rx) = winapi_signaled_mpsc_channel()?; + + let join_handle = std::thread::spawn(move || { + let mut process_ctx = WinApiProcessCtx { + session_id, + dvc_tx: dvc_tx.clone(), + input_event_rx, + stdout_read_pipe: Some(stdout_read_pipe), + stderr_read_pipe: Some(stderr_read_pipe), + stdin_write_pipe: Some(stdin_write_pipe), + process: process_handle, + }; + + let status = match process_ctx.start_io_loop() { + Ok(status) => { + info!(session_id, "Process execution completed with exit code {}", status); + NowStatus::new(NowSeverity::Info, NowStatusCode(status)) + .with_kind(ExecResultKind::EXITED.0) + .expect("BUG: Status kind is out of range") + } + Err(ExecError::Aborted) => { + info!(session_id, "Process execution aborted by user"); + + NowStatus::new(NowSeverity::Info, NowStatusCode::SUCCESS) + .with_kind(ExecResultKind::ABORTED.0) + .expect("BUG: Status kind is out of range") + } + Err(ExecError::Windows(error)) => { + error!(%error, session_id, "Process execution thread failed with WinAPI error"); + + let code = match u16::try_from(error.code().0) { + Ok(code) => NowStatusCode(code), + Err(_) => NowStatusCode::FAILURE, + }; + + NowStatus::new(NowSeverity::Error, code) + .with_kind(ExecResultKind::SESSION_ERROR_SYSETM.0) + .expect("BUG: Status kind is out of range") + } + Err(ExecError::Agent(error)) => { + error!(?error, session_id, "Process execution thread failed with agent error"); + + NowStatus::new(NowSeverity::Error, NowStatusCode(error.0)) + .with_kind(ExecResultKind::SESSION_ERROR_AGENT.0) + .expect("BUG: Status kind is out of range") + } + Err(ExecError::Other(error)) => { + error!(%error, session_id, "Process execution thread failed with unknown error"); + + NowStatus::new(NowSeverity::Error, NowStatusCode(ExecAgentError::OTHER.0)) + .with_kind(ExecResultKind::SESSION_ERROR_AGENT.0) + .expect("BUG: Status kind is out of range") + } + }; + + let message = NowExecResultMsg::new(session_id, status); + + if let Err(error) = dvc_tx.blocking_send(message.into()) { + error!(%error, session_id, "Failed to send process result message over channel"); + } + + if let Err(error) = io_notification_tx.blocking_send(ProcessIoNotification::Terminated { session_id }) { + error!(%error, session_id, "Failed to send termination message to task; This may cause resource leak!"); + } + }); + + Ok(WinApiProcess { + input_event_tx, + join_handle, + _temp_files: self.temp_files, + }) + } +} + +/// Represents spawned process with IO redirection. +pub struct WinApiProcess { + input_event_tx: WinapiSignaledSender, + join_handle: std::thread::JoinHandle<()>, + _temp_files: Vec, +} + +impl WinApiProcess { + pub async fn abort_execution(&self, status: NowStatus) -> anyhow::Result<()> { + self.input_event_tx + .send(ProcessIoInputEvent::AbortExecution(status)) + .await + } + + pub async fn cancel_execution(&self) -> anyhow::Result<()> { + self.input_event_tx.send(ProcessIoInputEvent::CancelExecution).await + } + + pub async fn send_stdin(&self, data: Vec, last: bool) -> anyhow::Result<()> { + self.input_event_tx + .send(ProcessIoInputEvent::DataIn { data, last }) + .await + } + + pub async fn shutdown(&self) -> anyhow::Result<()> { + self.input_event_tx.send(ProcessIoInputEvent::TerminateIo).await?; + Ok(()) + } + + pub fn is_session_terminated(&self) -> bool { + self.join_handle.is_finished() + } +} + +struct EnumWindowsContext { + expected_process: HANDLE, + windows: Vec, +} + +unsafe extern "system" fn windows_enum_func(hwnd: HWND, lparam: LPARAM) -> BOOL { + // SAFETY: lparam.0 should be set to valid EnumWindowsContext memory by caller. + let enum_ctx = unsafe { &mut *(lparam.0 as *mut EnumWindowsContext) }; + + // SAFETY: No preconditions. + let process = unsafe { GetProcessHandleFromHwnd(hwnd) }; + + if process.is_invalid() { + // Continue enumeration. + return true.into(); + } + + if process == enum_ctx.expected_process { + enum_ctx.windows.push(hwnd); + } + + true.into() +} diff --git a/devolutions-session/src/dvc/status.rs b/devolutions-session/src/dvc/status.rs new file mode 100644 index 000000000..e218c33e1 --- /dev/null +++ b/devolutions-session/src/dvc/status.rs @@ -0,0 +1,25 @@ +#[derive(Debug)] +pub struct ExecAgentError(pub u16); + +impl ExecAgentError { + pub const EXISTING_SESSION: Self = Self(0x0001); + pub const START_FAILED: Self = Self(0x0002); + pub const OTHER: Self = Self(0xFFFF); +} + +pub struct ExecResultKind(pub u8); + +impl ExecResultKind { + /// Application exited normally. `code` contains application exit code. + pub const EXITED: Self = Self(0x00); + + /// Session was closed because of system error. + /// `code` contains system error code (e.g. WinAPI error code). + pub const SESSION_ERROR_SYSETM: Self = Self(0x01); + + /// Session was closed because of agent-specific error. + pub const SESSION_ERROR_AGENT: Self = Self(0x02); + + /// Execution was aborted by user via `AbortExecution` message. + pub const ABORTED: Self = Self(0xFF); +} diff --git a/devolutions-session/src/dvc/task.rs b/devolutions-session/src/dvc/task.rs new file mode 100644 index 000000000..c7d382e23 --- /dev/null +++ b/devolutions-session/src/dvc/task.rs @@ -0,0 +1,563 @@ +use std::collections::HashMap; + +use anyhow::bail; +use async_trait::async_trait; +use tokio::select; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use windows::core::{HSTRING, PCWSTR}; +use windows::Win32::System::Shutdown::{ExitWindowsEx, LockWorkStation, EWX_LOGOFF, SHUTDOWN_REASON}; +use windows::Win32::UI::Shell::ShellExecuteW; +use windows::Win32::UI::WindowsAndMessaging::{MessageBoxW, MESSAGEBOX_STYLE, SW_RESTORE}; + +use devolutions_gateway_task::Task; +use now_proto_pdu::{ + NowExecBatchMsg, NowExecCapsetFlags, NowExecCapsetMsg, NowExecDataFlags, NowExecMessage, NowExecProcessMsg, + NowExecPwshMsg, NowExecResultMsg, NowExecRunMsg, NowExecWinPsFlags, NowExecWinPsMsg, NowMessage, NowMsgBoxResponse, + NowSessionMessage, NowSessionMsgBoxReqMsg, NowSessionMsgBoxRspMsg, NowSeverity, NowStatus, NowStatusCode, + NowSystemMessage, +}; +use win_api_wrappers::event::Event; +use win_api_wrappers::utils::WideString; + +use crate::dvc::channel::{bounded_mpsc_channel, winapi_signaled_mpsc_channel, WinapiSignaledSender}; +use crate::dvc::fs::TmpFileGuard; +use crate::dvc::io::run_dvc_io; +use crate::dvc::process::{ProcessIoNotification, WinApiProcess, WinApiProcessBuilder}; +use crate::dvc::status::{ExecAgentError, ExecResultKind}; + +#[derive(Default)] +pub struct DvcIoTask {} + +#[async_trait] +impl Task for DvcIoTask { + type Output = anyhow::Result<()>; + + const NAME: &'static str = "DVC processing"; + + async fn run(self, shutdown_signal: devolutions_gateway_task::ShutdownSignal) -> Self::Output { + let (write_tx, write_rx) = winapi_signaled_mpsc_channel()?; + let (read_tx, read_rx) = bounded_mpsc_channel()?; + + // WinAPI event to terminate DVC IO thread. + let io_thread_shutdown_event = Event::new_unnamed()?; + + let cloned_shutdown_event = io_thread_shutdown_event.clone(); + + // Spawning thread is relatively short operation, so it could be executed synchronously. + let io_thread = std::thread::spawn(move || { + let io_thread_result = run_dvc_io(write_rx, read_tx, cloned_shutdown_event); + + if let Err(error) = io_thread_result { + error!(%error, "DVC IO thread failed"); + } + }); + + // Join thread some time in future. + tokio::task::spawn_blocking(move || { + if io_thread.join().is_err() { + error!("DVC IO thread join failed"); + }; + }); + + info!("Processing DVC messages..."); + + let process_result = process_messages(read_rx, write_tx, shutdown_signal).await; + + // Send shutdown signal to IO thread to release WTS channel resources. + info!("Shutting down DVC IO thread"); + let _ = io_thread_shutdown_event.set(); + + process_result?; + + Ok(()) + } +} + +async fn process_messages( + mut read_rx: Receiver, + dvc_tx: WinapiSignaledSender, + mut shutdown_signal: devolutions_gateway_task::ShutdownSignal, +) -> anyhow::Result<()> { + let (io_notification_tx, mut task_rx) = mpsc::channel(100); + + let mut processor = MessageProcessor::new(dvc_tx, io_notification_tx); + + processor.send_initialization_sequence().await?; + + loop { + select! { + read_result = read_rx.recv() => { + match read_result { + Some(message) => { + match processor.process_message(message).await { + Ok(()) => {} + Err(error) => { + error!(%error, "Failed to process DVC message"); + return Err(error); + } + } + } + None => { + return Err(anyhow::anyhow!("Read channel has been closed")); + } + } + } + task_rx = task_rx.recv() => { + match task_rx { + Some(notification) => { + match notification { + ProcessIoNotification::Terminated { session_id } => { + info!(session_id, "Cleaning up session resources"); + processor.remove_session(session_id); + } + } + } + None => { + return Err(anyhow::anyhow!("Task channel has been closed")); + } + } + } + + _ = shutdown_signal.wait() => { + processor.shutdown_all_sessions().await; + return Ok(()); + } + } + } +} + +struct MessageProcessor { + dvc_tx: WinapiSignaledSender, + io_notification_tx: Sender, + downgraded_caps: NowExecCapsetFlags, + sessions: HashMap, +} + +impl MessageProcessor { + pub(crate) fn new( + dvc_tx: WinapiSignaledSender, + io_notification_tx: Sender, + ) -> Self { + Self { + dvc_tx, + io_notification_tx, + // Caps are empty until negotiated. + downgraded_caps: NowExecCapsetFlags::empty(), + sessions: HashMap::new(), + } + } + + pub(crate) async fn send_initialization_sequence(&self) -> anyhow::Result<()> { + // Caps supported by the server + let capabilities_pdu = NowMessage::from(NowExecCapsetMsg::new( + NowExecCapsetFlags::STYLE_RUN + | NowExecCapsetFlags::STYLE_PROCESS + | NowExecCapsetFlags::STYLE_CMD + | NowExecCapsetFlags::STYLE_PWSH + | NowExecCapsetFlags::STYLE_WINPS, + )); + + self.dvc_tx.send(capabilities_pdu).await?; + + Ok(()) + } + + async fn ensure_session_id_free(&self, session_id: u32) -> anyhow::Result<()> { + if self.sessions.contains_key(&session_id) { + self.dvc_tx + .send(NowMessage::from(NowExecResultMsg::new( + session_id, + NowStatus::new(NowSeverity::Fatal, NowStatusCode(ExecAgentError::EXISTING_SESSION.0)) + .with_kind(ExecResultKind::SESSION_ERROR_AGENT.0) + .expect("BUG: Exec result kind is out of bounds"), + ))) + .await?; + + bail!("Session ID is already in use"); + } + + Ok(()) + } + + async fn handle_process_run_result( + &mut self, + session_id: u32, + run_process_result: anyhow::Result, + ) -> anyhow::Result<()> { + match run_process_result { + Ok(process) => { + info!(session_id, "Process started!"); + + self.sessions.insert(session_id, process); + } + Err(error) => { + error!(session_id, %error, "Failed to start process"); + + self.dvc_tx + .send(NowMessage::from(NowExecResultMsg::new( + session_id, + NowStatus::new(NowSeverity::Fatal, NowStatusCode(ExecAgentError::START_FAILED.0)) + .with_kind(ExecResultKind::SESSION_ERROR_AGENT.0)?, + ))) + .await?; + } + } + + Ok(()) + } + + pub(crate) async fn process_message(&mut self, message: NowMessage) -> anyhow::Result<()> { + match message { + NowMessage::Exec(NowExecMessage::Capset(client_capset_message)) => { + // Execute downgrade caps sequence. + let server_flags = NowExecCapsetFlags::STYLE_RUN; + let downgraded_flags = server_flags & client_capset_message.flags(); + self.downgraded_caps = downgraded_flags; + + let downgraded_caps_pdu = NowMessage::from(NowExecCapsetMsg::new(downgraded_flags)); + + self.dvc_tx.send(downgraded_caps_pdu).await?; + } + NowMessage::Exec(NowExecMessage::Run(exec_msg)) => { + // Execute synchronously; ShellExecute will not block the calling thread, + // For "Run" we are only interested in fire-and-forget execution. + self.process_exec_run(exec_msg).await?; + } + NowMessage::Exec(NowExecMessage::Process(exec_msg)) => { + self.process_exec_process(exec_msg).await?; + } + NowMessage::Exec(NowExecMessage::Batch(batch_msg)) => { + self.process_exec_batch(batch_msg).await?; + } + NowMessage::Exec(NowExecMessage::WinPs(winps_msg)) => { + self.process_exec_winps(winps_msg).await?; + } + NowMessage::Exec(NowExecMessage::Pwsh(pwsh_msg)) => { + self.process_exec_pwsh(pwsh_msg).await?; + } + NowMessage::Exec(NowExecMessage::Abort(abort_msg)) => { + let session_id = abort_msg.session_id(); + + let process = match self.sessions.get_mut(&session_id) { + Some(process) => process, + None => { + warn!(session_id, "Session not found (abort)"); + return Ok(()); + } + }; + + process.abort_execution(abort_msg.status().clone()).await?; + + // We could drop session immediately after abort as client do not expect any further + // communication. + let _ = self.sessions.remove(&session_id); + } + NowMessage::Exec(NowExecMessage::CancelReq(cancel_msg)) => { + let session_id = cancel_msg.session_id(); + + let process = match self.sessions.get_mut(&session_id) { + Some(process) => process, + None => { + warn!(session_id, "Session not found (cancel)"); + return Ok(()); + } + }; + + process.cancel_execution().await?; + } + NowMessage::Exec(NowExecMessage::Data(data_msg)) => { + let session_id = data_msg.session_id(); + let flags = data_msg.flags(); + + if !flags.contains(NowExecDataFlags::STDIN) { + warn!(session_id, "Only STDIN data input is supported"); + return Ok(()); + } + + let process = match self.sessions.get_mut(&session_id) { + Some(process) => process, + None => { + warn!(session_id, "Session not found (data)"); + return Ok(()); + } + }; + + process + .send_stdin(data_msg.data().value().to_vec(), flags.contains(NowExecDataFlags::LAST)) + .await?; + } + NowMessage::Session(NowSessionMessage::MsgBoxReq(request)) => { + let tx = self.dvc_tx.clone(); + + // Spawn blocking task for message box to avoid blocking the IO loop. + let join_handle = tokio::task::spawn_blocking(move || process_msg_box_req(request, tx)); + drop(join_handle); + } + NowMessage::Session(NowSessionMessage::Logoff(_logoff_msg)) => { + // SAFETY: `ExitWindowsEx` is always safe to call. + if let Err(error) = unsafe { ExitWindowsEx(EWX_LOGOFF, SHUTDOWN_REASON(0)) } { + error!(%error, "Failed to logoff user session"); + } + } + NowMessage::Session(NowSessionMessage::Lock(_lock_msg)) => { + // SAFETY: `LockWorkStation` is always safe to call. + if let Err(error) = unsafe { LockWorkStation() } { + error!(%error, "Failed to lock workstation"); + } + } + NowMessage::System(NowSystemMessage::Shutdown(_shutdown_msg)) => { + // TODO: Adjust `NowSession` token privileges in NowAgent to make shutdown possible + // from this process. + } + _ => { + warn!("Unsupported message: {:?}", message); + } + } + + Ok(()) + } + + async fn process_exec_run(&self, params: NowExecRunMsg) -> anyhow::Result<()> { + let session_id = params.session_id(); + + // Empty null-terminated string. + let parameters = WideString::from(""); + let operation = WideString::from("open"); + let command = WideString::from(params.command().value()); + + info!(session_id, "Executing ShellExecuteW"); + + // SAFETY: All buffers are valid, therefore `ShellExecuteW` is safe to call. + let hinstance = unsafe { + ShellExecuteW( + None, + operation.as_pcwstr(), + command.as_pcwstr(), + parameters.as_pcwstr(), + None, + // Activate and show window + SW_RESTORE, + ) + }; + + if hinstance.0 as usize <= 32 { + error!("ShellExecuteW failed, error code: {}", hinstance.0 as usize); + + self.dvc_tx + .send(NowMessage::from(NowExecResultMsg::new( + session_id, + NowStatus::new(NowSeverity::Fatal, NowStatusCode(ExecAgentError::OTHER.0)) + .with_kind(ExecResultKind::SESSION_ERROR_AGENT.0) + .expect("BUG: Exec result kind is out of bounds"), + ))) + .await?; + } else { + self.dvc_tx + .send(NowMessage::from(NowExecResultMsg::new( + session_id, + NowStatus::new(NowSeverity::Info, NowStatusCode::SUCCESS) + .with_kind(ExecResultKind::EXITED.0) + .expect("BUG: Exec result kind is out of bounds"), + ))) + .await?; + } + + Ok(()) + } + + async fn process_exec_process(&mut self, exec_msg: NowExecProcessMsg) -> anyhow::Result<()> { + self.ensure_session_id_free(exec_msg.session_id()).await?; + + let run_process_result = WinApiProcessBuilder::new(exec_msg.filename().value()) + .with_command_line(exec_msg.parameters().clone().into()) + .with_current_directory(exec_msg.directory().clone().into()) + .run( + exec_msg.session_id(), + self.dvc_tx.clone(), + self.io_notification_tx.clone(), + ); + + self.handle_process_run_result(exec_msg.session_id(), run_process_result) + .await?; + + Ok(()) + } + + async fn process_exec_batch(&mut self, batch_msg: NowExecBatchMsg) -> anyhow::Result<()> { + self.ensure_session_id_free(batch_msg.session_id()).await?; + + let tmp_file = TmpFileGuard::new("bat")?; + tmp_file.write_content(batch_msg.command().value())?; + + let parameters = format!("/c \"{}\"", tmp_file.path_string()); + + let run_process_result = WinApiProcessBuilder::new("cmd.exe") + .with_temp_file(tmp_file) + .with_command_line(parameters) + .run( + batch_msg.session_id(), + self.dvc_tx.clone(), + self.io_notification_tx.clone(), + ); + + self.handle_process_run_result(batch_msg.session_id(), run_process_result) + .await?; + + Ok(()) + } + + async fn process_exec_winps(&mut self, winps_msg: NowExecWinPsMsg) -> anyhow::Result<()> { + self.ensure_session_id_free(winps_msg.session_id()).await?; + + let tmp_file = TmpFileGuard::new("ps1")?; + tmp_file.write_content(winps_msg.command().value())?; + + let mut params = Vec::new(); + + if let Some(execution_policy) = winps_msg.execution_policy() { + params.push("-ExecutionPolicy".to_string()); + params.push(execution_policy.value().to_string()); + } + + if let Some(configuration_name) = winps_msg.configuration_name() { + params.push("-ConfigurationName".to_string()); + params.push(configuration_name.value().to_string()); + } + + append_ps_flags(&mut params, winps_msg.flags()); + + params.push("-File".to_string()); + params.push(format!("\"{}\"", tmp_file.path_string())); + + let params_str = params.join(" "); + + let run_process_result = WinApiProcessBuilder::new("powershell.exe") + .with_temp_file(tmp_file) + .with_command_line(params_str) + .run( + winps_msg.session_id(), + self.dvc_tx.clone(), + self.io_notification_tx.clone(), + ); + + self.handle_process_run_result(winps_msg.session_id(), run_process_result) + .await?; + + Ok(()) + } + + async fn process_exec_pwsh(&mut self, pwsh_msg: NowExecPwshMsg) -> anyhow::Result<()> { + self.ensure_session_id_free(pwsh_msg.session_id()).await?; + + let tmp_file = TmpFileGuard::new("ps1")?; + tmp_file.write_content(pwsh_msg.command().value())?; + + let mut params = Vec::new(); + + if let Some(execution_policy) = pwsh_msg.execution_policy() { + params.push("-ExecutionPolicy".to_string()); + params.push(execution_policy.value().to_string()); + } + + if let Some(configuration_name) = pwsh_msg.configuration_name() { + params.push("-ConfigurationName".to_string()); + params.push(configuration_name.value().to_string()); + } + + append_ps_flags(&mut params, pwsh_msg.flags()); + + params.push("-File".to_string()); + params.push(format!("\"{}\"", tmp_file.path_string())); + + let params_str = params.join(" "); + + let run_process_result = WinApiProcessBuilder::new("pwsh.exe") + .with_temp_file(tmp_file) + .with_command_line(params_str) + .run( + pwsh_msg.session_id(), + self.dvc_tx.clone(), + self.io_notification_tx.clone(), + ); + + self.handle_process_run_result(pwsh_msg.session_id(), run_process_result) + .await?; + + Ok(()) + } + + pub(crate) fn remove_session(&mut self, session_id: u32) { + let _ = self.sessions.remove(&session_id); + } + + pub(crate) async fn shutdown_all_sessions(&mut self) { + for session in self.sessions.values() { + let _ = session.shutdown().await; + } + } +} + +fn append_ps_flags(args: &mut Vec, flags: NowExecWinPsFlags) { + if flags.contains(NowExecWinPsFlags::NO_LOGO) { + args.push("-NoLogo".to_string()); + } + + if flags.contains(NowExecWinPsFlags::NO_EXIT) { + args.push("-NoExit".to_string()); + } + + if flags.contains(NowExecWinPsFlags::STA) { + args.push("-Sta".to_string()); + } + + if flags.contains(NowExecWinPsFlags::MTA) { + args.push("-Mta".to_string()); + } + + if flags.contains(NowExecWinPsFlags::NO_PROFILE) { + args.push("-NoProfile".to_string()); + } + + if flags.contains(NowExecWinPsFlags::NON_INTERACTIVE) { + args.push("-NonInteractive".to_string()); + } +} + +async fn process_msg_box_req(request: NowSessionMsgBoxReqMsg, dvc_tx: WinapiSignaledSender) { + info!("Processing message box request `{}`", request.request_id()); + + let title = HSTRING::from( + request + .title() + .map(|varstr| varstr.value()) + .unwrap_or("Devolutions Agent"), + ); + + let text = HSTRING::from(request.message().value()); + + // TODO: Use undocumented `MessageBoxTimeout` instead + // or create custom window (?) + // SAFETY: `MessageBoxW` is always safe to call. + let result = unsafe { + MessageBoxW( + None, + PCWSTR(text.as_ptr()), + PCWSTR(title.as_ptr()), + MESSAGEBOX_STYLE(request.style().value()), + ) + }; + + #[allow(clippy::cast_sign_loss)] + let message_box_response = result.0 as u32; + + let send_result = dvc_tx + .send(NowMessage::from(NowSessionMsgBoxRspMsg::new( + request.request_id(), + NowMsgBoxResponse::new(message_box_response), + ))) + .await; + + if let Err(error) = send_result { + error!(%error, "Failed to send MessageBox response"); + } +} diff --git a/devolutions-session/src/lib.rs b/devolutions-session/src/lib.rs index 58c59021b..e90299422 100644 --- a/devolutions-session/src/lib.rs +++ b/devolutions-session/src/lib.rs @@ -1,14 +1,13 @@ #[macro_use] extern crate tracing; -#[cfg(windows)] -mod dvc; +use ::{ctrlc as _, devolutions_gateway_task as _, futures as _, tokio as _}; + +#[cfg(all(windows, feature = "dvc"))] +pub mod dvc; mod config; mod log; -pub use config::{get_data_dir, ConfHandle}; +pub use config::{get_data_dir, Conf, ConfHandle}; pub use log::init_log; - -#[cfg(windows)] -pub use dvc::loop_dvc; diff --git a/devolutions-session/src/main.rs b/devolutions-session/src/main.rs index e7ebcd5c8..168064ba7 100644 --- a/devolutions-session/src/main.rs +++ b/devolutions-session/src/main.rs @@ -2,17 +2,30 @@ // It has no effect on platforms other than Windows. #![cfg_attr(windows, windows_subsystem = "windows")] +use ::{ + camino as _, cfg_if as _, ctrlc as _, devolutions_log as _, futures as _, parking_lot as _, serde as _, + serde_json as _, tap as _, +}; + +#[cfg(all(windows, feature = "dvc"))] +use ::{ + async_trait as _, ironrdp as _, now_proto_pdu as _, tempfile as _, thiserror as _, win_api_wrappers as _, + windows as _, +}; + #[macro_use] extern crate tracing; +use devolutions_session::{get_data_dir, init_log, Conf, ConfHandle}; -use devolutions_session::{get_data_dir, init_log, ConfHandle}; - -#[cfg(windows)] -use devolutions_session::loop_dvc; +use devolutions_gateway_task::{ChildTask, ShutdownHandle, ShutdownSignal}; use anyhow::Context; -use std::sync::mpsc; +#[cfg(all(windows, feature = "dvc"))] +use devolutions_session::dvc::task::DvcIoTask; + +use tokio::runtime::Runtime; +use tokio::task::JoinHandle; fn main() -> anyhow::Result<()> { // Ensure per-user data dir exists. @@ -27,28 +40,102 @@ fn main() -> anyhow::Result<()> { info!("Starting Devolutions Session"); - // TMP: Copy-paste from MSRDPEX project for testing purposes. - #[cfg(windows)] - { - if conf.debug.enable_unstable { - loop_dvc(); - } else { - debug!("DVC loop is disabled"); - } - } - - let (shutdown_tx, shutdown_rx) = mpsc::channel(); + let (runtime, shutdown_handle, join_handle) = start(&conf)?; ctrlc::set_handler(move || { info!("Ctrl-C received, exiting"); - shutdown_tx.send(()).expect("BUG: Failed to send shutdown signal"); + + shutdown_handle.signal(); }) .expect("BUG: Failed to set Ctrl-C handler"); info!("Waiting for shutdown signal"); - shutdown_rx.recv().expect("BUG: Shutdown signal was lost"); + + runtime.block_on(join_handle)?; info!("Exiting Devolutions Session"); Ok(()) } + +pub fn start(config: &Conf) -> anyhow::Result<(Runtime, ShutdownHandle, JoinHandle<()>)> { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("failed to create runtime"); + + // NOTE: `spawn_tasks` should be always called from within the tokio runtime. + let tasks = runtime.block_on(spawn_tasks(config))?; + + trace!("Devolutions Session tasks created"); + + let shutdown_handle = tasks.shutdown_handle; + + let mut join_all = futures::future::select_all(tasks.inner.into_iter().map(|child| Box::pin(child.join()))); + + let join_handle = runtime.spawn(async { + loop { + let (result, _, rest) = join_all.await; + + match result { + Ok(Ok(())) => trace!("A task terminated gracefully"), + Ok(Err(error)) => error!(error = format!("{error:#}"), "A task failed"), + Err(error) => error!(%error, "Something went very wrong with a task"), + } + + if rest.is_empty() { + break; + } else { + join_all = futures::future::select_all(rest); + } + } + }); + + Ok((runtime, shutdown_handle, join_handle)) +} + +#[cfg(all(windows, feature = "dvc"))] +async fn spawn_tasks(config: &Conf) -> anyhow::Result { + let mut tasks = Tasks::new(); + + if config.debug.enable_unstable { + tasks.register(DvcIoTask::default()); + } + + Ok(tasks) +} + +#[cfg(not(all(windows, feature = "dvc")))] +async fn spawn_tasks(_config: &Conf) -> anyhow::Result { + Ok(Tasks::new()) +} + +struct Tasks { + inner: Vec>>, + shutdown_handle: ShutdownHandle, + // NOTE: Currently unused on non-windows platforms; kept for future use. + #[allow(dead_code)] + shutdown_signal: ShutdownSignal, +} + +impl Tasks { + fn new() -> Self { + let (shutdown_handle, shutdown_signal) = ShutdownHandle::new(); + + Self { + inner: Vec::new(), + shutdown_handle, + shutdown_signal, + } + } + + // NOTE: Currently unused on non-windows platforms; kept for future use. + #[allow(dead_code)] + fn register(&mut self, task: T) + where + T: devolutions_gateway_task::Task> + 'static, + { + let child = devolutions_gateway_task::spawn_task(task, self.shutdown_signal.clone()); + self.inner.push(child); + } +} diff --git a/jetsocat/src/doctor.rs b/jetsocat/src/doctor.rs index 2940aeb97..19c7d1892 100644 --- a/jetsocat/src/doctor.rs +++ b/jetsocat/src/doctor.rs @@ -822,8 +822,7 @@ mod native_tls_checks { fn openssl_check_chain(ctx: &mut DiagnosticCtx, server_certificates: &[X509]) -> anyhow::Result<()> { use openssl::ssl::{SslConnector, SslMethod}; use openssl::stack::Stack; - use openssl::x509::X509StoreContext; - use openssl::x509::X509VerifyResult; + use openssl::x509::{X509StoreContext, X509VerifyResult}; output!(ctx.out, "-> Create SSL context")?; diff --git a/jetsocat/src/main.rs b/jetsocat/src/main.rs index eba768344..849721bbc 100644 --- a/jetsocat/src/main.rs +++ b/jetsocat/src/main.rs @@ -23,9 +23,10 @@ use {proptest as _, test_utils as _}; extern crate tracing; use anyhow::Context as _; +use jetsocat::listener::ListenerMode; use jetsocat::pipe::PipeMode; use jetsocat::proxy::{detect_proxy, ProxyConfig, ProxyType}; -use jetsocat::{listener::ListenerMode, DoctorOutputFormat}; +use jetsocat::DoctorOutputFormat; use jmux_proxy::JmuxConfig; use seahorse::{App, Command, Context, Flag, FlagType}; use std::env;