diff --git a/.github/workflows/auto-update-dependencies.yml b/.github/workflows/auto-update-dependencies.yml index 81918471b..727131fae 100644 --- a/.github/workflows/auto-update-dependencies.yml +++ b/.github/workflows/auto-update-dependencies.yml @@ -20,7 +20,7 @@ jobs: - name: Install Linux requirements run: | - apt_dependencies="git curl libssl-dev pkg-config libudev-dev libv4l-dev" + apt_dependencies="git curl libssl-dev pkg-config libudev-dev libv4l-dev protobuf-compiler" echo "Run apt update and apt install the following dependencies: $apt_dependencies" sudo apt update sudo apt install -y $apt_dependencies diff --git a/.github/workflows/check-rust.yml b/.github/workflows/check-rust.yml index 344c1af7b..486b156b0 100644 --- a/.github/workflows/check-rust.yml +++ b/.github/workflows/check-rust.yml @@ -37,7 +37,7 @@ jobs: components: clippy, rustfmt - name: Install Linux requirements run: | - apt_dependencies="git curl libssl-dev pkg-config libudev-dev libv4l-dev" + apt_dependencies="git curl libssl-dev pkg-config libudev-dev libv4l-dev protobuf-compiler" echo "Run apt update and apt install the following dependencies: $apt_dependencies" sudo apt update sudo apt install -y $apt_dependencies diff --git a/.github/workflows/run-tarpaulin.yml b/.github/workflows/run-tarpaulin.yml index 0ff6f70de..9d6fdfcc4 100644 --- a/.github/workflows/run-tarpaulin.yml +++ b/.github/workflows/run-tarpaulin.yml @@ -41,13 +41,11 @@ jobs: - name: Start tarpaulin instance run: docker start $(cat container_id.txt) - name: Install linux requirement in tarpaulin instance - run: docker exec $(cat container_id.txt) sh -c "echo Run apt update and apt install the following dependencies - git curl libssl-dev pkg-config libudev-dev libv4l-dev ; apt update ; apt install -y git curl libssl-dev pkg-config libudev-dev libv4l-dev" + run: docker exec $(cat container_id.txt) sh -c "echo Run apt update and apt install the following dependencies - git curl libssl-dev pkg-config libudev-dev libv4l-dev protobuf-compiler ; apt update ; apt install -y git curl libssl-dev pkg-config libudev-dev libv4l-dev protobuf-compiler" - name: Install desired rust version run: docker exec $(cat container_id.txt) sh -c "rustup install $CARGO_VERSION" - name: Tell cargo to use desired rust version run: docker exec $(cat container_id.txt) sh -c "rustup override set $CARGO_VERSION" - - name: Install rust requirements in tarpaulin instance - run: docker exec $(cat container_id.txt) sh -c "rustup component add rustfmt" - name: Run tarpaulin run: docker exec $(cat container_id.txt) sh -c "RUST_LOG=trace cargo tarpaulin -v --all-features --out Xml" diff --git a/Cargo.lock b/Cargo.lock index b1db8ffdb..eebf80d6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -703,6 +703,51 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b70caf9f1b0c045f7da350636435b775a9733adf2df56e8aa2a29210fbc335d4" +dependencies = [ + "async-trait", + "axum-core", + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding 2.3.0", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backoff" version = "0.4.0" @@ -1211,9 +1256,9 @@ dependencies = [ [[package]] name = "fixedbitset" -version = "0.2.0" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "flate2" @@ -1485,6 +1530,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "hermit-abi" version = "0.2.6" @@ -1979,6 +2030,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" +[[package]] +name = "matchit" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" + [[package]] name = "memchr" version = "2.5.0" @@ -2378,9 +2435,9 @@ dependencies = [ [[package]] name = "petgraph" -version = "0.5.1" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "467d164a6de56270bd7c4d070df81d07beace25012d5103ced4e9ff08d6afdb7" +checksum = "4dd7d28ee937e54fe3080c91faa1c3a46c06de6252988a7f4592ba2310ef22a4" dependencies = [ "fixedbitset", "indexmap", @@ -2475,6 +2532,16 @@ dependencies = [ "termtree", ] +[[package]] +name = "prettyplease" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86" +dependencies = [ + "proc-macro2 1.0.59", + "syn 1.0.109", +] + [[package]] name = "proc-macro2" version = "0.4.30" @@ -2543,9 +2610,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.8.0" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de5e2533f59d08fcf364fd374ebda0692a70bd6d7e66ef97f306f45c6c5d8020" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" dependencies = [ "bytes", "prost-derive", @@ -2553,27 +2620,31 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.8.0" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "355f634b43cdd80724ee7848f95770e7e70eefa6dcf14fea676216573b8fd603" +checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" dependencies = [ "bytes", - "heck", + "heck 0.4.1", "itertools", + "lazy_static", "log", "multimap", "petgraph", + "prettyplease", "prost", "prost-types", + "regex", + "syn 1.0.109", "tempfile", "which", ] [[package]] name = "prost-derive" -version = "0.8.0" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "600d2f334aa05acb02a755e217ef1ab6dea4d51b58b7846588b747edec04efba" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" dependencies = [ "anyhow", "itertools", @@ -2584,11 +2655,10 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.8.0" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "603bbd6394701d13f3f25aada59c7de9d35a6a5887cfc156181234a44002771b" +checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" dependencies = [ - "bytes", "prost", ] @@ -2789,10 +2859,22 @@ dependencies = [ "base64 0.13.1", "log", "ring", - "sct", + "sct 0.6.1", "webpki", ] +[[package]] +name = "rustls" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07180898a28ed6a7f7ba2311594308f595e3dd2e3c3812fa0a80a47b45f17e5d" +dependencies = [ + "log", + "ring", + "rustls-webpki", + "sct 0.7.0", +] + [[package]] name = "rustls-pemfile" version = "1.0.2" @@ -2802,6 +2884,22 @@ dependencies = [ "base64 0.21.2", ] +[[package]] +name = "rustls-webpki" +version = "0.100.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6207cd5ed3d8dca7816f8f3725513a34609c0c765bf652b8c3cb4cfd87db46b" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "rustversion" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06" + [[package]] name = "ryu" version = "1.0.13" @@ -2854,6 +2952,16 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "secrecy" version = "0.8.0" @@ -3081,6 +3189,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "tempfile" version = "3.6.0" @@ -3237,13 +3351,12 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.22.0" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" +checksum = "e0d409377ff5b1e3ca6437aa86c1eb7d40c134bfec254e44c830defa92669db5" dependencies = [ - "rustls", + "rustls 0.21.0", "tokio", - "webpki", ] [[package]] @@ -3300,13 +3413,14 @@ dependencies = [ [[package]] name = "tonic" -version = "0.5.2" +version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "796c5e1cd49905e65dd8e700d4cb1dffcbfdb4fc9d017de08c1a537afd83627c" +checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ "async-stream", "async-trait", - "base64 0.13.1", + "axum", + "base64 0.21.2", "bytes", "futures-core", "futures-util", @@ -3318,24 +3432,23 @@ dependencies = [ "percent-encoding 2.3.0", "pin-project", "prost", - "prost-derive", + "rustls-pemfile", "tokio", "tokio-rustls", "tokio-stream", - "tokio-util 0.6.10", "tower", "tower-layer", "tower-service", "tracing", - "tracing-futures", ] [[package]] name = "tonic-build" -version = "0.5.2" +version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12b52d07035516c2b74337d2ac7746075e7dcae7643816c1b12c5ff8a7484c08" +checksum = "a6fdaae4c2c638bb70fe42803a26fbd6fc6ac8c72f5c59f67ecc2a2dcabf4b07" dependencies = [ + "prettyplease", "proc-macro2 1.0.59", "prost-build", "quote 1.0.28", @@ -3427,16 +3540,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "tracing-futures" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" -dependencies = [ - "pin-project", - "tracing", -] - [[package]] name = "treediff" version = "4.0.2" @@ -3785,7 +3888,7 @@ dependencies = [ "kube-runtime", "openapi", "openssl", - "rustls", + "rustls 0.19.1", "serde", "serde_json", ] @@ -3957,7 +4060,7 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c0b0a4701f203ebaecce4971a6bb8575aa07b617bdc39ddfc6ffeff3a38530d" dependencies = [ - "heck", + "heck 0.3.3", "log", "proc-macro2 1.0.59", "quote 1.0.28", diff --git a/Cross.toml b/Cross.toml index 8adc2fa45..565804a3b 100644 --- a/Cross.toml +++ b/Cross.toml @@ -6,10 +6,10 @@ passthrough = [ ] [target.x86_64-unknown-linux-gnu] -image = "ghcr.io/project-akri/akri/rust-crossbuild:x86_64-unknown-linux-gnu-0.1.16-0.0.7" +image = "ghcr.io/project-akri/akri/rust-crossbuild:x86_64-unknown-linux-gnu-0.1.16-0.0.8" [target.armv7-unknown-linux-gnueabihf] -image = "ghcr.io/project-akri/akri/rust-crossbuild:armv7-unknown-linux-gnueabihf-0.1.16-0.0.7" +image = "ghcr.io/project-akri/akri/rust-crossbuild:armv7-unknown-linux-gnueabihf-0.1.16-0.0.8" [target.aarch64-unknown-linux-gnu] -image = "ghcr.io/project-akri/akri/rust-crossbuild:aarch64-unknown-linux-gnu-0.1.16-0.0.7" \ No newline at end of file +image = "ghcr.io/project-akri/akri/rust-crossbuild:aarch64-unknown-linux-gnu-0.1.16-0.0.8" diff --git a/agent/Cargo.toml b/agent/Cargo.toml index fe949d776..cd36f2243 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -32,8 +32,8 @@ lazy_static = "1.4" log = "0.4" mockall_double = "0.2.0" prometheus = { version = "0.12.0", features = ["process"] } -prost = "0.8.0" -prost-types = "0.8.0" +prost = "0.11" +prost-types = "0.11" rand = "0.8.2" serde = "1.0.104" serde_json = "1.0.45" @@ -41,13 +41,13 @@ serde_yaml = "0.8.11" serde_derive = "1.0.104" tokio = { version = "1.0", features = ["rt-multi-thread", "time", "fs", "macros", "net"] } tokio-stream = { version = "0.1", features = ["net"] } -tonic = "0.5.2" +tonic = "0.9.2" tower = "0.4.8" url = "2.1.0" uuid = { version = "0.8.1", features = ["v4"] } [build-dependencies] -tonic-build = "0.5.2" +tonic-build = "0.9.2" [dev-dependencies] # for testing using a simple discovery handler diff --git a/agent/src/util/v1beta1.rs b/agent/src/util/v1beta1.rs index 6cea16015..7354b098c 100644 --- a/agent/src/util/v1beta1.rs +++ b/agent/src/util/v1beta1.rs @@ -1,9 +1,11 @@ +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct DevicePluginOptions { /// Indicates if PreStartContainer call is required before each container start #[prost(bool, tag = "1")] pub pre_start_required: bool, } +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct RegisterRequest { /// Version of the API the Device Plugin was built against @@ -20,11 +22,13 @@ pub struct RegisterRequest { #[prost(message, optional, tag = "4")] pub options: ::core::option::Option, } +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Empty {} /// ListAndWatch returns a stream of List of Devices /// Whenever a Device state change or a Device disapears, ListAndWatch /// returns the new list +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ListAndWatchResponse { #[prost(message, repeated, tag = "1")] @@ -32,9 +36,10 @@ pub struct ListAndWatchResponse { } /// E.g: /// struct Device { -/// ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e", -/// State: "Healthy", -///} +/// ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e", +/// State: "Healthy", +/// } +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Device { /// A unique ID assigned by the device plugin used @@ -49,26 +54,30 @@ pub struct Device { /// - PreStartContainer is expected to be called before each container start if indicated by plugin during registration phase. /// - PreStartContainer allows kubelet to pass reinitialized devices to containers. /// - PreStartContainer allows Device Plugin to run device specific operations on -/// the Devices requested +/// the Devices requested +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PreStartContainerRequest { #[prost(string, repeated, tag = "1")] pub devices_i_ds: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } /// PreStartContainerResponse will be send by plugin in response to PreStartContainerRequest +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PreStartContainerResponse {} /// - Allocate is expected to be called during pod creation since allocation -/// failures for any container would result in pod startup failure. +/// failures for any container would result in pod startup failure. /// - Allocate allows kubelet to exposes additional artifacts in a pod's -/// environment as directed by the plugin. +/// environment as directed by the plugin. /// - Allocate allows Device Plugin to run device specific operations on -/// the Devices requested +/// the Devices requested +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct AllocateRequest { #[prost(message, repeated, tag = "1")] pub container_requests: ::prost::alloc::vec::Vec, } +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ContainerAllocateRequest { #[prost(string, repeated, tag = "1")] @@ -82,11 +91,13 @@ pub struct ContainerAllocateRequest { /// Allocation on dev1 succeeds but allocation on dev2 fails. /// The Device plugin should send a ListAndWatch update and fail the /// Allocation request +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct AllocateResponse { #[prost(message, repeated, tag = "1")] pub container_responses: ::prost::alloc::vec::Vec, } +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ContainerAllocateResponse { /// List of environment variable to be set in the container to access one of more devices. @@ -106,6 +117,7 @@ pub struct ContainerAllocateResponse { } /// Mount specifies a host volume to mount into a container. /// where device library or tools are installed on host and container +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Mount { /// Path of the mount within the container. @@ -119,6 +131,7 @@ pub struct Mount { pub read_only: bool, } /// DeviceSpec specifies a host device to mount into a container. +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct DeviceSpec { /// Path of the device within the container. @@ -134,25 +147,26 @@ pub struct DeviceSpec { #[prost(string, tag = "3")] pub permissions: ::prost::alloc::string::String, } -#[doc = r" Generated client implementations."] +/// Generated client implementations. pub mod registration_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::http::Uri; use tonic::codegen::*; - #[doc = " Registration is the service advertised by the Kubelet"] - #[doc = " Only when Kubelet answers with a success code to a Register Request"] - #[doc = " may Device Plugins start their service"] - #[doc = " Registration may fail when device plugin version is not supported by"] - #[doc = " Kubelet or the registered resourceName is already taken by another"] - #[doc = " active device plugin. Device plugin is expected to terminate upon registration failure"] + /// Registration is the service advertised by the Kubelet + /// Only when Kubelet answers with a success code to a Register Request + /// may Device Plugins start their service + /// Registration may fail when device plugin version is not supported by + /// Kubelet or the registered resourceName is already taken by another + /// active device plugin. Device plugin is expected to terminate upon registration failure #[derive(Debug, Clone)] pub struct RegistrationClient { inner: tonic::client::Grpc, } impl RegistrationClient { - #[doc = r" Attempt to create a new client by connecting to a given endpoint."] + /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where - D: std::convert::TryInto, + D: TryInto, D::Error: Into, { let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; @@ -162,20 +176,25 @@ pub mod registration_client { impl RegistrationClient where T: tonic::client::GrpcService, - T::ResponseBody: Body + Send + Sync + 'static, T::Error: Into, + T::ResponseBody: Body + Send + 'static, ::Error: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); Self { inner } } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } pub fn with_interceptor( inner: T, interceptor: F, ) -> RegistrationClient> where F: tonic::service::Interceptor, + T::ResponseBody: Default, T: tonic::codegen::Service< http::Request, Response = http::Response< @@ -187,23 +206,41 @@ pub mod registration_client { { RegistrationClient::new(InterceptedService::new(inner, interceptor)) } - #[doc = r" Compress requests with `gzip`."] - #[doc = r""] - #[doc = r" This requires the server to support it otherwise it might respond with an"] - #[doc = r" error."] - pub fn send_gzip(mut self) -> Self { - self.inner = self.inner.send_gzip(); + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); self } - #[doc = r" Enable decompressing responses with `gzip`."] - pub fn accept_gzip(mut self) -> Self { - self.inner = self.inner.accept_gzip(); + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); self } pub async fn register( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner.ready().await.map_err(|e| { tonic::Status::new( tonic::Code::Unknown, @@ -212,24 +249,28 @@ pub mod registration_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/v1beta1.Registration/Register"); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("v1beta1.Registration", "Register")); + self.inner.unary(req, path, codec).await } } } -#[doc = r" Generated client implementations."] +/// Generated client implementations. pub mod device_plugin_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::http::Uri; use tonic::codegen::*; - #[doc = " DevicePlugin is the service advertised by Device Plugins"] + /// DevicePlugin is the service advertised by Device Plugins #[derive(Debug, Clone)] pub struct DevicePluginClient { inner: tonic::client::Grpc, } impl DevicePluginClient { - #[doc = r" Attempt to create a new client by connecting to a given endpoint."] + /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where - D: std::convert::TryInto, + D: TryInto, D::Error: Into, { let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; @@ -239,20 +280,25 @@ pub mod device_plugin_client { impl DevicePluginClient where T: tonic::client::GrpcService, - T::ResponseBody: Body + Send + Sync + 'static, T::Error: Into, + T::ResponseBody: Body + Send + 'static, ::Error: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); Self { inner } } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } pub fn with_interceptor( inner: T, interceptor: F, ) -> DevicePluginClient> where F: tonic::service::Interceptor, + T::ResponseBody: Default, T: tonic::codegen::Service< http::Request, Response = http::Response< @@ -264,25 +310,44 @@ pub mod device_plugin_client { { DevicePluginClient::new(InterceptedService::new(inner, interceptor)) } - #[doc = r" Compress requests with `gzip`."] - #[doc = r""] - #[doc = r" This requires the server to support it otherwise it might respond with an"] - #[doc = r" error."] - pub fn send_gzip(mut self) -> Self { - self.inner = self.inner.send_gzip(); + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); self } - #[doc = r" Enable decompressing responses with `gzip`."] - pub fn accept_gzip(mut self) -> Self { - self.inner = self.inner.accept_gzip(); + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); self } - #[doc = " GetDevicePluginOptions returns options to be communicated with Device"] - #[doc = " Manager"] + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// GetDevicePluginOptions returns options to be communicated with Device + /// Manager pub async fn get_device_plugin_options( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> + { self.inner.ready().await.map_err(|e| { tonic::Status::new( tonic::Code::Unknown, @@ -293,15 +358,20 @@ pub mod device_plugin_client { let path = http::uri::PathAndQuery::from_static( "/v1beta1.DevicePlugin/GetDevicePluginOptions", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "v1beta1.DevicePlugin", + "GetDevicePluginOptions", + )); + self.inner.unary(req, path, codec).await } - #[doc = " ListAndWatch returns a stream of List of Devices"] - #[doc = " Whenever a Device state change or a Device disapears, ListAndWatch"] - #[doc = " returns the new list"] + /// ListAndWatch returns a stream of List of Devices + /// Whenever a Device state change or a Device disapears, ListAndWatch + /// returns the new list pub async fn list_and_watch( &mut self, request: impl tonic::IntoRequest, - ) -> Result< + ) -> std::result::Result< tonic::Response>, tonic::Status, > { @@ -313,17 +383,18 @@ pub mod device_plugin_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/v1beta1.DevicePlugin/ListAndWatch"); - self.inner - .server_streaming(request.into_request(), path, codec) - .await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("v1beta1.DevicePlugin", "ListAndWatch")); + self.inner.server_streaming(req, path, codec).await } - #[doc = " Allocate is called during container creation so that the Device"] - #[doc = " Plugin can run device specific operations and instruct Kubelet"] - #[doc = " of the steps to make the Device available in the container"] + /// Allocate is called during container creation so that the Device + /// Plugin can run device specific operations and instruct Kubelet + /// of the steps to make the Device available in the container pub async fn allocate( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner.ready().await.map_err(|e| { tonic::Status::new( tonic::Code::Unknown, @@ -332,15 +403,19 @@ pub mod device_plugin_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/v1beta1.DevicePlugin/Allocate"); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("v1beta1.DevicePlugin", "Allocate")); + self.inner.unary(req, path, codec).await } - #[doc = " PreStartContainer is called, if indicated by Device Plugin during registeration phase,"] - #[doc = " before each container start. Device plugin can run device specific operations"] - #[doc = " such as reseting the device before making devices available to the container"] + /// PreStartContainer is called, if indicated by Device Plugin during registeration phase, + /// before each container start. Device plugin can run device specific operations + /// such as reseting the device before making devices available to the container pub async fn pre_start_container( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> + { self.inner.ready().await.map_err(|e| { tonic::Status::new( tonic::Code::Unknown, @@ -350,43 +425,52 @@ pub mod device_plugin_client { let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/v1beta1.DevicePlugin/PreStartContainer"); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("v1beta1.DevicePlugin", "PreStartContainer")); + self.inner.unary(req, path, codec).await } } } -#[doc = r" Generated server implementations."] +/// Generated server implementations. pub mod registration_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; - #[doc = "Generated trait containing gRPC methods that should be implemented for use with RegistrationServer."] + /// Generated trait containing gRPC methods that should be implemented for use with RegistrationServer. #[async_trait] pub trait Registration: Send + Sync + 'static { async fn register( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; } - #[doc = " Registration is the service advertised by the Kubelet"] - #[doc = " Only when Kubelet answers with a success code to a Register Request"] - #[doc = " may Device Plugins start their service"] - #[doc = " Registration may fail when device plugin version is not supported by"] - #[doc = " Kubelet or the registered resourceName is already taken by another"] - #[doc = " active device plugin. Device plugin is expected to terminate upon registration failure"] + /// Registration is the service advertised by the Kubelet + /// Only when Kubelet answers with a success code to a Register Request + /// may Device Plugins start their service + /// Registration may fail when device plugin version is not supported by + /// Kubelet or the registered resourceName is already taken by another + /// active device plugin. Device plugin is expected to terminate upon registration failure #[derive(Debug)] pub struct RegistrationServer { inner: _Inner, - accept_compression_encodings: (), - send_compression_encodings: (), + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, } struct _Inner(Arc); impl RegistrationServer { pub fn new(inner: T) -> Self { - let inner = Arc::new(inner); + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, } } pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService @@ -395,17 +479,48 @@ pub mod registration_server { { InterceptedService::new(Self::new(inner), interceptor) } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } } impl tonic::codegen::Service> for RegistrationServer where T: Registration, - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into + Send + 'static, { type Response = http::Response; - type Error = Never; + type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -421,22 +536,29 @@ pub mod registration_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).register(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; let method = RegisterSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -460,12 +582,14 @@ pub mod registration_server { inner, accept_compression_encodings: self.accept_compression_encodings, send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, } } } impl Clone for _Inner { fn clone(&self) -> Self { - Self(self.0.clone()) + Self(Arc::clone(&self.0)) } } impl std::fmt::Debug for _Inner { @@ -473,66 +597,72 @@ pub mod registration_server { write!(f, "{:?}", self.0) } } - impl tonic::transport::NamedService for RegistrationServer { + impl tonic::server::NamedService for RegistrationServer { const NAME: &'static str = "v1beta1.Registration"; } } -#[doc = r" Generated server implementations."] +/// Generated server implementations. pub mod device_plugin_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; - #[doc = "Generated trait containing gRPC methods that should be implemented for use with DevicePluginServer."] + /// Generated trait containing gRPC methods that should be implemented for use with DevicePluginServer. #[async_trait] pub trait DevicePlugin: Send + Sync + 'static { - #[doc = " GetDevicePluginOptions returns options to be communicated with Device"] - #[doc = " Manager"] + /// GetDevicePluginOptions returns options to be communicated with Device + /// Manager async fn get_device_plugin_options( &self, request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the ListAndWatch method."] - type ListAndWatchStream: futures_core::Stream> - + Send - + Sync + ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the ListAndWatch method. + type ListAndWatchStream: futures_core::Stream< + Item = std::result::Result, + > + Send + 'static; - #[doc = " ListAndWatch returns a stream of List of Devices"] - #[doc = " Whenever a Device state change or a Device disapears, ListAndWatch"] - #[doc = " returns the new list"] + /// ListAndWatch returns a stream of List of Devices + /// Whenever a Device state change or a Device disapears, ListAndWatch + /// returns the new list async fn list_and_watch( &self, request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = " Allocate is called during container creation so that the Device"] - #[doc = " Plugin can run device specific operations and instruct Kubelet"] - #[doc = " of the steps to make the Device available in the container"] + ) -> std::result::Result, tonic::Status>; + /// Allocate is called during container creation so that the Device + /// Plugin can run device specific operations and instruct Kubelet + /// of the steps to make the Device available in the container async fn allocate( &self, request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = " PreStartContainer is called, if indicated by Device Plugin during registeration phase,"] - #[doc = " before each container start. Device plugin can run device specific operations"] - #[doc = " such as reseting the device before making devices available to the container"] + ) -> std::result::Result, tonic::Status>; + /// PreStartContainer is called, if indicated by Device Plugin during registeration phase, + /// before each container start. Device plugin can run device specific operations + /// such as reseting the device before making devices available to the container async fn pre_start_container( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; } - #[doc = " DevicePlugin is the service advertised by Device Plugins"] + /// DevicePlugin is the service advertised by Device Plugins #[derive(Debug)] pub struct DevicePluginServer { inner: _Inner, - accept_compression_encodings: (), - send_compression_encodings: (), + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, } struct _Inner(Arc); impl DevicePluginServer { pub fn new(inner: T) -> Self { - let inner = Arc::new(inner); + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, } } pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService @@ -541,17 +671,48 @@ pub mod device_plugin_server { { InterceptedService::new(Self::new(inner), interceptor) } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } } impl tonic::codegen::Service> for DevicePluginServer where T: DevicePlugin, - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into + Send + 'static, { type Response = http::Response; - type Error = Never; + type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -564,7 +725,7 @@ pub mod device_plugin_server { type Response = super::DevicePluginOptions; type Future = BoxFuture, tonic::Status>; fn call(&mut self, request: tonic::Request) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).get_device_plugin_options(request).await }; Box::pin(fut) @@ -572,15 +733,22 @@ pub mod device_plugin_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; let method = GetDevicePluginOptionsSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -595,22 +763,29 @@ pub mod device_plugin_server { type Future = BoxFuture, tonic::Status>; fn call(&mut self, request: tonic::Request) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).list_and_watch(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; let method = ListAndWatchSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.server_streaming(method, req).await; Ok(res) }; @@ -626,22 +801,29 @@ pub mod device_plugin_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).allocate(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; let method = AllocateSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -660,22 +842,29 @@ pub mod device_plugin_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).pre_start_container(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; let method = PreStartContainerSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -699,12 +888,14 @@ pub mod device_plugin_server { inner, accept_compression_encodings: self.accept_compression_encodings, send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, } } } impl Clone for _Inner { fn clone(&self) -> Self { - Self(self.0.clone()) + Self(Arc::clone(&self.0)) } } impl std::fmt::Debug for _Inner { @@ -712,7 +903,7 @@ pub mod device_plugin_server { write!(f, "{:?}", self.0) } } - impl tonic::transport::NamedService for DevicePluginServer { + impl tonic::server::NamedService for DevicePluginServer { const NAME: &'static str = "v1beta1.DevicePlugin"; } } diff --git a/build/setup.sh b/build/setup.sh index 1e73d6406..5810663dc 100755 --- a/build/setup.sh +++ b/build/setup.sh @@ -5,7 +5,7 @@ set -ex echo "User: $(whoami)" -apt_dependencies="git curl libssl-dev pkg-config libudev-dev libv4l-dev" +apt_dependencies="git curl libssl-dev pkg-config libudev-dev libv4l-dev protobuf-compiler" echo "Install dependencies: $apt_dependencies" if [ -x "$(command -v sudo)" ]; then @@ -32,7 +32,4 @@ else echo "Found rustup" fi -echo "Install rustfmt" -rustup component add rustfmt - -exit 0 \ No newline at end of file +exit 0 diff --git a/discovery-handlers/debug-echo/Cargo.toml b/discovery-handlers/debug-echo/Cargo.toml index f3372a3f3..3bcaa4161 100644 --- a/discovery-handlers/debug-echo/Cargo.toml +++ b/discovery-handlers/debug-echo/Cargo.toml @@ -20,7 +20,7 @@ serde_yaml = "0.8.11" serde_derive = "1.0.104" tokio = { version = "1.0.1", features = ["time", "net", "sync"] } tokio-stream = { version = "0.1", features = ["net"] } -tonic = { version = "0.5.2", features = ["tls"] } +tonic = { version = "0.9.2", features = ["tls"] } [dev-dependencies] akri-shared = { path = "../../shared" } diff --git a/discovery-handlers/onvif/Cargo.toml b/discovery-handlers/onvif/Cargo.toml index 5a8925882..4b2e698f6 100644 --- a/discovery-handlers/onvif/Cargo.toml +++ b/discovery-handlers/onvif/Cargo.toml @@ -25,7 +25,7 @@ sxd-document = "0.3.0" sxd-xpath = "0.4.0" tokio = { version = "1.0", features = ["time", "net", "sync"] } tokio-stream = { version = "0.1", features = ["net"] } -tonic = { version = "0.5.2", features = ["tls"] } +tonic = { version = "0.9.2", features = ["tls"] } uuid = { version = "0.8.1", features = ["v4"] } yaserde = "0.7.1" yaserde_derive = "0.7.1" diff --git a/discovery-handlers/opcua/Cargo.toml b/discovery-handlers/opcua/Cargo.toml index f84451818..8b3c205c1 100644 --- a/discovery-handlers/opcua/Cargo.toml +++ b/discovery-handlers/opcua/Cargo.toml @@ -16,14 +16,14 @@ env_logger = "0.10.0" futures-util = "0.3" log = "0.4" opcua = { version = "0.11.0", features = ["client"] } -prost = "0.8.0" +prost = "0.11" serde = "1.0.104" serde_json = "1.0.45" serde_yaml = "0.8.11" serde_derive = "1.0.1" tokio = { version = "1.0.2", features = ["time", "net", "sync"] } tokio-stream = { version = "0.1", features = ["net"] } -tonic = { version = "0.5.2", features = ["tls"] } +tonic = { version = "0.9.2", features = ["tls"] } url = "2.2.0" [dev-dependencies] diff --git a/discovery-handlers/udev/Cargo.toml b/discovery-handlers/udev/Cargo.toml index d8a051f4d..0f70e18d4 100644 --- a/discovery-handlers/udev/Cargo.toml +++ b/discovery-handlers/udev/Cargo.toml @@ -16,7 +16,7 @@ futures-util = "0.3" log = "0.4" pest = "2.0" pest_derive = "2.0" -prost = "0.8.0" +prost = "0.11" regex = "1" serde = "1.0.104" serde_json = "1.0.45" @@ -24,7 +24,7 @@ serde_yaml = "0.8.11" serde_derive = "1.0.104" tokio = { version = "1.0", features = ["time", "net", "sync"] } tokio-stream = { version = "0.1", features = ["net"] } -tonic = { version = "0.5.2", features = ["tls"] } +tonic = { version = "0.9.2", features = ["tls"] } udev = "0.5" [dev-dependencies] diff --git a/discovery-utils/Cargo.toml b/discovery-utils/Cargo.toml index 7d4639afd..1283c97f7 100644 --- a/discovery-utils/Cargo.toml +++ b/discovery-utils/Cargo.toml @@ -14,14 +14,14 @@ anyhow = "1.0.38" async-trait = { version = "0.1.0", optional = true } futures = { version = "0.3.1", package = "futures" } log = "0.4" -prost = "0.8" +prost = "0.11" serde = "1.0" serde_derive = "1.0" serde_yaml = "0.8.11" tempfile = { version = "3.1.0", optional = true } tokio = { version = "1.0.1", features = ["time", "net", "sync"] } tokio-stream = { version = "0.1", features = ["net"] } -tonic = { version = "0.5.2", features = ["tls"] } +tonic = { version = "0.9.2", features = ["tls"] } tower = "0.4.8" [features] @@ -32,4 +32,4 @@ async-trait = "0.1.0" tempfile = "3.1.0" [build-dependencies] -tonic-build = "0.5.2" +tonic-build = "0.9.2" diff --git a/discovery-utils/src/discovery/v0.rs b/discovery-utils/src/discovery/v0.rs index f21bb2105..005706464 100644 --- a/discovery-utils/src/discovery/v0.rs +++ b/discovery-utils/src/discovery/v0.rs @@ -1,3 +1,4 @@ +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct RegisterDiscoveryHandlerRequest { /// Name of the `DiscoveryHandler`. This name is specified in an @@ -26,9 +27,31 @@ pub mod register_discovery_handler_request { Uds = 0, Network = 1, } + impl EndpointType { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + EndpointType::Uds => "UDS", + EndpointType::Network => "NETWORK", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "UDS" => Some(Self::Uds), + "NETWORK" => Some(Self::Network), + _ => None, + } + } + } } +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Empty {} +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct DiscoverRequest { /// String containing all the details (such as filtering options) @@ -36,12 +59,14 @@ pub struct DiscoverRequest { #[prost(string, tag = "1")] pub discovery_details: ::prost::alloc::string::String, } +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct DiscoverResponse { /// List of discovered devices #[prost(message, repeated, tag = "1")] pub devices: ::prost::alloc::vec::Vec, } +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Device { /// Identifier for this device @@ -63,6 +88,7 @@ pub struct Device { /// From Device Plugin API /// Mount specifies a host volume to mount into a container. /// where device library or tools are installed on host and container +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Mount { /// Path of the mount within the container. @@ -77,6 +103,7 @@ pub struct Mount { } /// From Device Plugin API /// DeviceSpec specifies a host device to mount into a container. +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct DeviceSpec { /// Path of the device within the container. @@ -92,21 +119,22 @@ pub struct DeviceSpec { #[prost(string, tag = "3")] pub permissions: ::prost::alloc::string::String, } -#[doc = r" Generated client implementations."] +/// Generated client implementations. pub mod registration_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::http::Uri; use tonic::codegen::*; - #[doc = " Registration is the service advertised by the Akri Agent."] - #[doc = " Any `DiscoveryHandler` can register with the Akri Agent."] + /// Registration is the service advertised by the Akri Agent. + /// Any `DiscoveryHandler` can register with the Akri Agent. #[derive(Debug, Clone)] pub struct RegistrationClient { inner: tonic::client::Grpc, } impl RegistrationClient { - #[doc = r" Attempt to create a new client by connecting to a given endpoint."] + /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where - D: std::convert::TryInto, + D: TryInto, D::Error: Into, { let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; @@ -116,20 +144,25 @@ pub mod registration_client { impl RegistrationClient where T: tonic::client::GrpcService, - T::ResponseBody: Body + Send + Sync + 'static, T::Error: Into, + T::ResponseBody: Body + Send + 'static, ::Error: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); Self { inner } } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } pub fn with_interceptor( inner: T, interceptor: F, ) -> RegistrationClient> where F: tonic::service::Interceptor, + T::ResponseBody: Default, T: tonic::codegen::Service< http::Request, Response = http::Response< @@ -141,23 +174,41 @@ pub mod registration_client { { RegistrationClient::new(InterceptedService::new(inner, interceptor)) } - #[doc = r" Compress requests with `gzip`."] - #[doc = r""] - #[doc = r" This requires the server to support it otherwise it might respond with an"] - #[doc = r" error."] - pub fn send_gzip(mut self) -> Self { - self.inner = self.inner.send_gzip(); + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); self } - #[doc = r" Enable decompressing responses with `gzip`."] - pub fn accept_gzip(mut self) -> Self { - self.inner = self.inner.accept_gzip(); + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); self } pub async fn register_discovery_handler( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner.ready().await.map_err(|e| { tonic::Status::new( tonic::Code::Unknown, @@ -167,23 +218,29 @@ pub mod registration_client { let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/v0.Registration/RegisterDiscoveryHandler"); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "v0.Registration", + "RegisterDiscoveryHandler", + )); + self.inner.unary(req, path, codec).await } } } -#[doc = r" Generated client implementations."] +/// Generated client implementations. pub mod discovery_handler_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::http::Uri; use tonic::codegen::*; #[derive(Debug, Clone)] pub struct DiscoveryHandlerClient { inner: tonic::client::Grpc, } impl DiscoveryHandlerClient { - #[doc = r" Attempt to create a new client by connecting to a given endpoint."] + /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where - D: std::convert::TryInto, + D: TryInto, D::Error: Into, { let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; @@ -193,20 +250,25 @@ pub mod discovery_handler_client { impl DiscoveryHandlerClient where T: tonic::client::GrpcService, - T::ResponseBody: Body + Send + Sync + 'static, T::Error: Into, + T::ResponseBody: Body + Send + 'static, ::Error: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); Self { inner } } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } pub fn with_interceptor( inner: T, interceptor: F, ) -> DiscoveryHandlerClient> where F: tonic::service::Interceptor, + T::ResponseBody: Default, T: tonic::codegen::Service< http::Request, Response = http::Response< @@ -218,24 +280,44 @@ pub mod discovery_handler_client { { DiscoveryHandlerClient::new(InterceptedService::new(inner, interceptor)) } - #[doc = r" Compress requests with `gzip`."] - #[doc = r""] - #[doc = r" This requires the server to support it otherwise it might respond with an"] - #[doc = r" error."] - pub fn send_gzip(mut self) -> Self { - self.inner = self.inner.send_gzip(); + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); self } - #[doc = r" Enable decompressing responses with `gzip`."] - pub fn accept_gzip(mut self) -> Self { - self.inner = self.inner.accept_gzip(); + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); self } pub async fn discover( &mut self, request: impl tonic::IntoRequest, - ) -> Result>, tonic::Status> - { + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { self.inner.ready().await.map_err(|e| { tonic::Status::new( tonic::Code::Unknown, @@ -244,41 +326,48 @@ pub mod discovery_handler_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/v0.DiscoveryHandler/Discover"); - self.inner - .server_streaming(request.into_request(), path, codec) - .await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("v0.DiscoveryHandler", "Discover")); + self.inner.server_streaming(req, path, codec).await } } } -#[doc = r" Generated server implementations."] +/// Generated server implementations. pub mod registration_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; - #[doc = "Generated trait containing gRPC methods that should be implemented for use with RegistrationServer."] + /// Generated trait containing gRPC methods that should be implemented for use with RegistrationServer. #[async_trait] pub trait Registration: Send + Sync + 'static { async fn register_discovery_handler( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; } - #[doc = " Registration is the service advertised by the Akri Agent."] - #[doc = " Any `DiscoveryHandler` can register with the Akri Agent."] + /// Registration is the service advertised by the Akri Agent. + /// Any `DiscoveryHandler` can register with the Akri Agent. #[derive(Debug)] pub struct RegistrationServer { inner: _Inner, - accept_compression_encodings: (), - send_compression_encodings: (), + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, } struct _Inner(Arc); impl RegistrationServer { pub fn new(inner: T) -> Self { - let inner = Arc::new(inner); + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, } } pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService @@ -287,17 +376,48 @@ pub mod registration_server { { InterceptedService::new(Self::new(inner), interceptor) } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } } impl tonic::codegen::Service> for RegistrationServer where T: Registration, - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into + Send + 'static, { type Response = http::Response; - type Error = Never; + type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -316,7 +436,7 @@ pub mod registration_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).register_discovery_handler(request).await }; Box::pin(fut) @@ -324,15 +444,22 @@ pub mod registration_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; let method = RegisterDiscoveryHandlerSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -356,12 +483,14 @@ pub mod registration_server { inner, accept_compression_encodings: self.accept_compression_encodings, send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, } } } impl Clone for _Inner { fn clone(&self) -> Self { - Self(self.0.clone()) + Self(Arc::clone(&self.0)) } } impl std::fmt::Debug for _Inner { @@ -369,42 +498,47 @@ pub mod registration_server { write!(f, "{:?}", self.0) } } - impl tonic::transport::NamedService for RegistrationServer { + impl tonic::server::NamedService for RegistrationServer { const NAME: &'static str = "v0.Registration"; } } -#[doc = r" Generated server implementations."] +/// Generated server implementations. pub mod discovery_handler_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; - #[doc = "Generated trait containing gRPC methods that should be implemented for use with DiscoveryHandlerServer."] + /// Generated trait containing gRPC methods that should be implemented for use with DiscoveryHandlerServer. #[async_trait] pub trait DiscoveryHandler: Send + Sync + 'static { - #[doc = "Server streaming response type for the Discover method."] - type DiscoverStream: futures_core::Stream> + /// Server streaming response type for the Discover method. + type DiscoverStream: futures_core::Stream> + Send - + Sync + 'static; async fn discover( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct DiscoveryHandlerServer { inner: _Inner, - accept_compression_encodings: (), - send_compression_encodings: (), + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, } struct _Inner(Arc); impl DiscoveryHandlerServer { pub fn new(inner: T) -> Self { - let inner = Arc::new(inner); + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, } } pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService @@ -413,17 +547,48 @@ pub mod discovery_handler_server { { InterceptedService::new(Self::new(inner), interceptor) } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } } impl tonic::codegen::Service> for DiscoveryHandlerServer where T: DiscoveryHandler, - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into + Send + 'static, { type Response = http::Response; - type Error = Never; + type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -444,22 +609,29 @@ pub mod discovery_handler_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).discover(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; let method = DiscoverSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.server_streaming(method, req).await; Ok(res) }; @@ -483,12 +655,14 @@ pub mod discovery_handler_server { inner, accept_compression_encodings: self.accept_compression_encodings, send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, } } } impl Clone for _Inner { fn clone(&self) -> Self { - Self(self.0.clone()) + Self(Arc::clone(&self.0)) } } impl std::fmt::Debug for _Inner { @@ -496,7 +670,7 @@ pub mod discovery_handler_server { write!(f, "{:?}", self.0) } } - impl tonic::transport::NamedService for DiscoveryHandlerServer { + impl tonic::server::NamedService for DiscoveryHandlerServer { const NAME: &'static str = "v0.DiscoveryHandler"; } } diff --git a/samples/brokers/udev-video-broker/Cargo.toml b/samples/brokers/udev-video-broker/Cargo.toml index 5e855de51..5e5db0076 100644 --- a/samples/brokers/udev-video-broker/Cargo.toml +++ b/samples/brokers/udev-video-broker/Cargo.toml @@ -13,11 +13,11 @@ env_logger = "0.10.0" lazy_static = "1.4" log = "0.4.3" prometheus = { version = "0.12.0", features = ["process"] } -prost = "0.8.0" +prost = "0.11" regex = "1" tokio = { version = "1.0.1", features = ["time", "fs", "macros", "signal"] } -tonic = "0.5.2" +tonic = "0.9.2" rscam = "0.5.5" [build-dependencies] -tonic-build = "0.5.2" +tonic-build = "0.9.2" diff --git a/samples/brokers/udev-video-broker/src/util/camera.rs b/samples/brokers/udev-video-broker/src/util/camera.rs index 58b176209..cd6cb692d 100644 --- a/samples/brokers/udev-video-broker/src/util/camera.rs +++ b/samples/brokers/udev-video-broker/src/util/camera.rs @@ -1,5 +1,7 @@ +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct NotifyRequest {} +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct NotifyResponse { #[prost(bytes = "vec", tag = "1")] @@ -7,19 +9,20 @@ pub struct NotifyResponse { #[prost(string, tag = "2")] pub camera: ::prost::alloc::string::String, } -#[doc = r" Generated client implementations."] +/// Generated client implementations. pub mod camera_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::http::Uri; use tonic::codegen::*; #[derive(Debug, Clone)] pub struct CameraClient { inner: tonic::client::Grpc, } impl CameraClient { - #[doc = r" Attempt to create a new client by connecting to a given endpoint."] + /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where - D: std::convert::TryInto, + D: TryInto, D::Error: Into, { let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; @@ -29,20 +32,25 @@ pub mod camera_client { impl CameraClient where T: tonic::client::GrpcService, - T::ResponseBody: Body + Send + Sync + 'static, T::Error: Into, + T::ResponseBody: Body + Send + 'static, ::Error: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); Self { inner } } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } pub fn with_interceptor( inner: T, interceptor: F, ) -> CameraClient> where F: tonic::service::Interceptor, + T::ResponseBody: Default, T: tonic::codegen::Service< http::Request, Response = http::Response< @@ -54,23 +62,41 @@ pub mod camera_client { { CameraClient::new(InterceptedService::new(inner, interceptor)) } - #[doc = r" Compress requests with `gzip`."] - #[doc = r""] - #[doc = r" This requires the server to support it otherwise it might respond with an"] - #[doc = r" error."] - pub fn send_gzip(mut self) -> Self { - self.inner = self.inner.send_gzip(); + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); self } - #[doc = r" Enable decompressing responses with `gzip`."] - pub fn accept_gzip(mut self) -> Self { - self.inner = self.inner.accept_gzip(); + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); self } pub async fn get_frame( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner.ready().await.map_err(|e| { tonic::Status::new( tonic::Code::Unknown, @@ -79,37 +105,46 @@ pub mod camera_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/camera.Camera/GetFrame"); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("camera.Camera", "GetFrame")); + self.inner.unary(req, path, codec).await } } } -#[doc = r" Generated server implementations."] +/// Generated server implementations. pub mod camera_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; - #[doc = "Generated trait containing gRPC methods that should be implemented for use with CameraServer."] + /// Generated trait containing gRPC methods that should be implemented for use with CameraServer. #[async_trait] pub trait Camera: Send + Sync + 'static { async fn get_frame( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct CameraServer { inner: _Inner, - accept_compression_encodings: (), - send_compression_encodings: (), + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, } struct _Inner(Arc); impl CameraServer { pub fn new(inner: T) -> Self { - let inner = Arc::new(inner); + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, } } pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService @@ -118,17 +153,48 @@ pub mod camera_server { { InterceptedService::new(Self::new(inner), interceptor) } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } } impl tonic::codegen::Service> for CameraServer where T: Camera, - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into + Send + 'static, { type Response = http::Response; - type Error = Never; + type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -144,22 +210,29 @@ pub mod camera_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).get_frame(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; let method = GetFrameSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -183,12 +256,14 @@ pub mod camera_server { inner, accept_compression_encodings: self.accept_compression_encodings, send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, } } } impl Clone for _Inner { fn clone(&self) -> Self { - Self(self.0.clone()) + Self(Arc::clone(&self.0)) } } impl std::fmt::Debug for _Inner { @@ -196,7 +271,7 @@ pub mod camera_server { write!(f, "{:?}", self.0) } } - impl tonic::transport::NamedService for CameraServer { + impl tonic::server::NamedService for CameraServer { const NAME: &'static str = "camera.Camera"; } } diff --git a/shared/Cargo.toml b/shared/Cargo.toml index b63817391..050a8216c 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -28,6 +28,6 @@ serde_derive = "1.0" serde_json = "1.0" serde_yaml = "0.8" tokio = { version = "1.0.1", features = ["full"] } -tonic = "0.5.2" +tonic = "0.9.2" tower = "0.4.8" warp = "0.3.4"