From 633515af723f1648bed020d233b6123c544beddd Mon Sep 17 00:00:00 2001 From: kwsc98 <958280102@qq.com> Date: Tue, 14 May 2024 08:58:46 +0800 Subject: [PATCH 1/4] 20240513 --- Cargo.toml | 3 +- dubbo-macro/Cargo.toml | 17 ++ dubbo-macro/LICENSE | 202 ++++++++++++++++++ dubbo-macro/src/lib.rs | 107 ++++++++++ dubbo-macro/src/server_macro.rs | 144 +++++++++++++ dubbo-macro/src/trait_macro.rs | 165 ++++++++++++++ dubbo/src/framework.rs | 17 +- dubbo/src/protocol/triple/mod.rs | 2 +- dubbo/src/triple/mod.rs | 1 + dubbo/src/triple/server/mod.rs | 1 + dubbo/src/triple/server/support.rs | 150 +++++++++++++ dubbo/src/triple/triple_wrapper.rs | 80 +++++++ .../echo/src/generated/grpc.examples.echo.rs | 127 +++++++---- examples/interface/Cargo.toml | 37 ++++ examples/interface/LICENSE | 202 ++++++++++++++++++ examples/interface/README.md | 22 ++ examples/interface/README_CN.md | 21 ++ examples/interface/application.yaml | 25 +++ examples/interface/src/client.rs | 42 ++++ examples/interface/src/lib.rs | 37 ++++ examples/interface/src/server.rs | 67 ++++++ 21 files changed, 1429 insertions(+), 40 deletions(-) create mode 100644 dubbo-macro/Cargo.toml create mode 100644 dubbo-macro/LICENSE create mode 100644 dubbo-macro/src/lib.rs create mode 100644 dubbo-macro/src/server_macro.rs create mode 100644 dubbo-macro/src/trait_macro.rs create mode 100644 dubbo/src/triple/server/support.rs create mode 100644 dubbo/src/triple/triple_wrapper.rs create mode 100644 examples/interface/Cargo.toml create mode 100644 examples/interface/LICENSE create mode 100644 examples/interface/README.md create mode 100644 examples/interface/README_CN.md create mode 100644 examples/interface/application.yaml create mode 100644 examples/interface/src/client.rs create mode 100644 examples/interface/src/lib.rs create mode 100644 examples/interface/src/server.rs diff --git a/Cargo.toml b/Cargo.toml index 02c1e2f6..2af43271 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,8 @@ members = [ "dubbo", "examples/echo", "examples/greeter", - "dubbo-build", + "examples/interface", + "dubbo-build", "dubbo-macro", ] diff --git a/dubbo-macro/Cargo.toml b/dubbo-macro/Cargo.toml new file mode 100644 index 00000000..a3c29695 --- /dev/null +++ b/dubbo-macro/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "dubbo-macro" +version = "0.3.0" +edition = "2021" +license = "Apache-2.0" +description = "dubbo-macro" +documentation = "https://github.com/apache/dubbo-rust" +repository = "https://github.com/apache/dubbo-rust.git" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +proc-macro = true + +[dependencies] +proc-macro2 = "1" +quote = "1" +syn = { version = "2", features = ["full"] } \ No newline at end of file diff --git a/dubbo-macro/LICENSE b/dubbo-macro/LICENSE new file mode 100644 index 00000000..d6456956 --- /dev/null +++ b/dubbo-macro/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/dubbo-macro/src/lib.rs b/dubbo-macro/src/lib.rs new file mode 100644 index 00000000..6db902a9 --- /dev/null +++ b/dubbo-macro/src/lib.rs @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use proc_macro::TokenStream; +use quote::{ToTokens}; +use syn::parse::Parser; + +mod server_macro; +mod trait_macro; + + + +#[proc_macro_attribute] +pub fn dubbo_trait(attr: TokenStream, item: TokenStream) -> TokenStream { + let attr = DubboAttr::from_attr(attr); + match attr { + Ok(attr) => { trait_macro::dubbo_trait(attr, item) } + Err(err) => { err.into_compile_error().into() } + } +} + +#[proc_macro_attribute] +pub fn dubbo_server(attr: TokenStream, item: TokenStream) -> TokenStream { + let attr = DubboAttr::from_attr(attr); + match attr { + Ok(attr) => { server_macro::dubbo_server(attr, item) } + Err(err) => { err.into_compile_error().into() } + } +} + + +#[derive(Default)] +struct DubboAttr { + package: Option, + version: Option, +} + +impl DubboAttr { + fn from_attr(args: TokenStream) -> Result { + syn::punctuated::Punctuated::::parse_terminated + .parse2(args.into()) + .and_then(|args| Self::build_attr(args)) + } + + fn build_attr(args: syn::punctuated::Punctuated::) -> Result { + let mut package = None; + let mut version = None; + for arg in args { + match arg { + syn::Meta::NameValue(namevalue) => { + let ident = namevalue + .path + .get_ident() + .ok_or_else(|| { + syn::Error::new_spanned(&namevalue, "Must have specified ident") + })? + .to_string() + .to_lowercase(); + let lit = match &namevalue.value { + syn::Expr::Lit(syn::ExprLit { lit, .. }) => lit.to_token_stream().to_string(), + expr => expr.to_token_stream().to_string(), + } + .replace("\"", ""); + match ident.as_str() { + "package" => { + let _ = package.insert(lit); + } + "version" => { + let _ = version.insert(lit); + } + name => { + let msg = format!( + "Unknown attribute {} is specified; expected one of: {} ", + name, "'package','version'", + ); + return Err(syn::Error::new_spanned(namevalue, msg)); + } + } + } + other => { + return Err(syn::Error::new_spanned( + other, + "Unknown attribute inside the dubbo-macro", + )); + } + } + } + Ok(DubboAttr { + package, + version, + }) + } +} diff --git a/dubbo-macro/src/server_macro.rs b/dubbo-macro/src/server_macro.rs new file mode 100644 index 00000000..f07fda7d --- /dev/null +++ b/dubbo-macro/src/server_macro.rs @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +use crate::DubboAttr; +use proc_macro::TokenStream; +use quote::{quote, ToTokens}; +use syn::{parse_macro_input, FnArg, ItemImpl, ImplItem}; + +pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream { + let version = match attr.version { + Some(version) => quote!(Some(&#version)), + None => quote!(None), + }; + let org_item = parse_macro_input!(item as ItemImpl); + let server_item = get_server_item(org_item.clone()); + let item = org_item.clone(); + let item_trait = &item.trait_.unwrap().1.segments[0].ident; + let item_self = item.self_ty; + let items_ident_fn = item.items.iter().fold(vec![], |mut vec, e| { + if let ImplItem::Fn(fn_item) = e { + vec.push(fn_item.sig.ident.clone()) + } + vec + }); + let items_fn = item.items.iter().fold(vec![], |mut vec, e| { + if let ImplItem::Fn(fn_item) = e { + let method = &fn_item.sig.ident; + let mut req_pat = vec![]; + let req = fn_item.sig.inputs.iter().fold(vec![], |mut vec, e| { + if let FnArg::Typed(input) = e { + let req = &input.pat; + let req_type = &input.ty; + let token = quote! { + let result : Result<#req_type,_> = serde_json::from_slice(param_req[idx].as_bytes()); + if let Err(err) = result { + param.res = Err(dubbo::status::Status::new(dubbo::status::Code::InvalidArgument,err.to_string())); + return param; + } + let #req : #req_type = result.unwrap(); + idx += 1; + }; + req_pat.push(req); + vec.push(token); + } + vec + }, + ); + vec.push(quote! { + if ¶m.method_name[..] == stringify!(#method) { + let param_req = ¶m.req; + let mut idx = 0; + #( + #req + )* + let res = self.#method( + #( + #req_pat, + )* + ).await; + param.res = match res { + Ok(res) => { + let res = serde_json::to_string(&res).unwrap(); + Ok(res) + }, + Err(info) => Err(info) + }; + return param; + } + } + ) + } + vec + }); + let service_unique = match &attr.package { + None => { quote!(stringify!(#item_trait)) } + Some(attr) => { quote!(#attr.to_owned() + "." + stringify!(#item_trait)) } + }; + let expanded = quote! { + #server_item + use dubbo::triple::server::support::RpcServer; + use dubbo::triple::server::support::RpcFuture; + use dubbo::triple::server::support::RpcMsg; + impl RpcServer for #item_self { + fn invoke (&self, param : RpcMsg) -> RpcFuture { + let mut rpc = self.clone(); + Box::pin(async move {rpc.prv_invoke(param).await}) + } + fn get_info(&self) -> (&str , &str , Option<&str> , Vec) { + let mut methods = vec![]; + #( + methods.push(stringify!(#items_ident_fn).to_string()); + )* + (#service_unique , #version ,methods) + } + } + + impl #item_self { + async fn prv_invoke (&self, mut param : RpcMsg) -> RpcMsg { + #(#items_fn)* + param.res = Err( + dubbo::status::Status::new(dubbo::status::Code::NotFound,format!("not find method by {}",param.method_name)) + ); + return param; + } + } + }; + expanded.into() +} + + +fn get_server_item(item: ItemImpl) -> proc_macro2::TokenStream { + let impl_item = item.impl_token; + let trait_ident = item.trait_.unwrap().1; + let ident = item.self_ty.to_token_stream(); + let fn_items = item.items.iter().fold(vec![], |mut vec, e| { + if let ImplItem::Fn(fn_item) = e { + vec.push(fn_item); + } + vec + }); + quote! { + #impl_item #trait_ident for #ident { + #( + #[allow(non_snake_case)] + #fn_items + )* + } + } +} \ No newline at end of file diff --git a/dubbo-macro/src/trait_macro.rs b/dubbo-macro/src/trait_macro.rs new file mode 100644 index 00000000..82f11176 --- /dev/null +++ b/dubbo-macro/src/trait_macro.rs @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::DubboAttr; +use proc_macro::TokenStream; +use quote::{quote, ToTokens}; +use syn::{parse_macro_input, FnArg, ItemTrait, ReturnType, TraitItem}; + +pub fn dubbo_trait(attr: DubboAttr, item: TokenStream) -> TokenStream { + let version = match attr.version { + Some(version) => quote!(Some(&#version)), + None => quote!(None), + }; + let input = parse_macro_input!(item as ItemTrait); + let item_trait = get_item_trait(input.clone()); + let trait_ident = &input.ident; + let vis = &input.vis; + let items = &input.items; + let mut sig_item = vec![]; + for item in items { + if let TraitItem::Fn(item) = item { + sig_item.push(item.sig.clone()); + } + } + let mut fn_quote = vec![]; + for item in sig_item { + let asyncable = item.asyncness; + let ident = item.ident; + let inputs = item.inputs; + let req = inputs.iter().fold(vec![], |mut vec, e| { + if let FnArg::Typed(req) = e { + vec.push(req.pat.clone()); + } + vec + }); + let output = item.output; + let output_type = match &output { + ReturnType::Default => { + quote! {()} + } + ReturnType::Type(_, res_type) => res_type.to_token_stream(), + }; + let inputs = inputs.iter().fold(vec![], |mut vec, e| { + let mut token = e.to_token_stream(); + if vec.is_empty() { + if let FnArg::Receiver(_r) = e { + token = quote!(&mut self); + } + } + vec.push(token); + vec + }); + let package = stringify!(#trait_ident); + let service_unique = match &attr.package { + None => { quote!(#package) } + Some(attr) => { quote!(#attr.to_owned() + "." + #package) } + }; + fn_quote.push( + quote! { + #[allow(non_snake_case)] + pub #asyncable fn #ident (#(#inputs),*) -> Result<#output_type,dubbo::status::Status> { + let mut req_vec : Vec = vec![]; + #( + let mut req_poi_str = serde_json::to_string(&#req); + if let Err(err) = req_poi_str { + return Err(dubbo::status::Status::new(dubbo::status::Code::InvalidArgument,err.to_string())); + } + req_vec.push(req_poi_str.unwrap()); + )* + let _version : Option<&str> = #version; + let request = Request::new(TripleRequestWrapper::new(req_vec)); + let service_unique = #service_unique; + let method_name = stringify!(#ident).to_string(); + let invocation = dubbo::invocation::RpcInvocation::default() + .with_service_unique_name(service_unique.to_owned()) + .with_method_name(method_name.clone()); + let path = "/".to_string() + service_unique + "/" + &method_name; + let path = http::uri::PathAndQuery::from_str( + &path, + ).unwrap(); + let res = self.inner.unary::(request, path, invocation).await; + match res { + Ok(res) => { + let response_wrapper = res.into_parts().1; + let res: #output_type = serde_json::from_slice(&response_wrapper.data).unwrap(); + Ok(res) + }, + Err(err) => Err(err) + } + } + } + ); + } + let rpc_client = syn::Ident::new(&format!("{}Client", trait_ident), trait_ident.span()); + let expanded = quote! { + use dubbo::triple::client::TripleClient; + use dubbo::triple::triple_wrapper::TripleRequestWrapper; + use dubbo::triple::triple_wrapper::TripleResponseWrapper; + use dubbo::triple::codec::prost::ProstCodec; + use dubbo::invocation::Request; + use dubbo::invocation::Response; + use dubbo::triple::client::builder::ClientBuilder; + use std::str::FromStr; + + #item_trait + + #vis struct #rpc_client { + inner: TripleClient + } + impl #rpc_client { + #( + #fn_quote + )* + pub fn new(builder: ClientBuilder) -> #rpc_client { + #rpc_client {inner: TripleClient::new(builder),} + } + } + }; + TokenStream::from(expanded) +} + + +fn get_item_trait(item: ItemTrait) -> proc_macro2::TokenStream { + let trait_ident = &item.ident; + let item_fn = item.items.iter().fold(vec![], |mut vec, e| { + if let TraitItem::Fn(item_fn) = e { + let asyncable = &item_fn.sig.asyncness; + let ident = &item_fn.sig.ident; + let inputs = &item_fn.sig.inputs; + let output_type = match &item_fn.sig.output { + ReturnType::Default => { + quote! {()} + } + ReturnType::Type(_, res_type) => res_type.to_token_stream(), + }; + vec.push(quote!( + #asyncable fn #ident (#inputs) -> Result<#output_type,dubbo::status::Status>; + )); + } + vec + }); + quote! { + pub trait #trait_ident { + #( + #[allow(async_fn_in_trait)] + #[allow(non_snake_case)] + #item_fn + )* + } + } +} \ No newline at end of file diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs index 5cfc3119..6bb6e3b1 100644 --- a/dubbo/src/framework.rs +++ b/dubbo/src/framework.rs @@ -27,6 +27,7 @@ use crate::{ Url, }; use futures::{future, Future}; +use crate::triple::server::support::{RpcHttp2Server, RpcServer}; // Invoker是否可以基于hyper写一个通用的 @@ -60,6 +61,20 @@ impl Dubbo { self } + pub fn register_server(self, server: T) -> Self { + let info = server.get_info(); + let server_name = info.0.to_owned(); + let s: RpcHttp2Server = RpcHttp2Server::new(server); + crate::protocol::triple::TRIPLE_SERVICES + .write() + .unwrap() + .insert( + server_name, + crate::utils::boxed_clone::BoxCloneService::new(s), + ); + self + } + pub fn init(&mut self) -> Result<(), Box> { if self.config.is_none() { self.config = Some(get_global_config()) @@ -127,7 +142,7 @@ impl Dubbo { .with_registries(registry_extensions.clone()) .with_services(self.service_registry.clone()), ); - let mut async_vec: Vec + Send>>> = Vec::new(); + let mut async_vec: Vec + Send>>> = Vec::new(); for (name, items) in self.protocols.iter() { for url in items.iter() { info!("base: {:?}, service url: {:?}", name, url); diff --git a/dubbo/src/protocol/triple/mod.rs b/dubbo/src/protocol/triple/mod.rs index e97b1505..9b2ae811 100644 --- a/dubbo/src/protocol/triple/mod.rs +++ b/dubbo/src/protocol/triple/mod.rs @@ -26,7 +26,7 @@ use std::{collections::HashMap, sync::RwLock}; use crate::{utils::boxed_clone::BoxCloneService, BoxBody}; pub type GrpcBoxCloneService = - BoxCloneService, http::Response, std::convert::Infallible>; +BoxCloneService, http::Response, std::convert::Infallible>; lazy_static! { pub static ref TRIPLE_SERVICES: RwLock> = diff --git a/dubbo/src/triple/mod.rs b/dubbo/src/triple/mod.rs index 799ac099..07aa906a 100644 --- a/dubbo/src/triple/mod.rs +++ b/dubbo/src/triple/mod.rs @@ -23,3 +23,4 @@ pub mod decode; pub mod encode; pub mod server; pub mod transport; +pub mod triple_wrapper; diff --git a/dubbo/src/triple/server/mod.rs b/dubbo/src/triple/server/mod.rs index b36f7693..abd48f91 100644 --- a/dubbo/src/triple/server/mod.rs +++ b/dubbo/src/triple/server/mod.rs @@ -18,5 +18,6 @@ pub mod builder; pub mod service; pub mod triple; +pub mod support; pub use triple::TripleServer; diff --git a/dubbo/src/triple/server/support.rs b/dubbo/src/triple/server/support.rs new file mode 100644 index 00000000..896338c1 --- /dev/null +++ b/dubbo/src/triple/server/support.rs @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::{ + sync::Arc, + task::{Context, Poll}, +}; + +use crate::{ + codegen::{Request, Response, UnarySvc}, + status::{Code, Status}, + triple::triple_wrapper::{TripleRequestWrapper, TripleResponseWrapper}, + BoxBody, BoxFuture, StdError, +}; +use http_body::Body; +use tower::Service; + +use super::TripleServer; + +pub type RpcFuture = std::pin::Pin + Send>>; + +pub struct RpcMsg { + pub version: Option, + pub class_name: String, + pub method_name: String, + pub req: Vec, + pub res: Result, +} + +impl RpcMsg { + pub fn new(path: String, version: Option) -> Self { + let attr: Vec<&str> = path.split("/").collect(); + RpcMsg { + version, + class_name: attr[1].to_string(), + method_name: attr[2].to_string(), + req: vec![], + res: Err(Status::new(Code::Ok, "success".to_string())), + } + } +} + +pub trait RpcServer: Send + Sync + 'static { + fn invoke(&self, msg: RpcMsg) -> RpcFuture; + fn get_info(&self) -> (&str, Option<&str>, Vec); +} + +struct _Inner(Arc); + +#[derive(Debug)] +pub struct RpcHttp2Server { + inner: _Inner, +} + +impl RpcHttp2Server { + pub fn new(inner: T) -> Self { + Self { + inner: _Inner(Arc::new(inner)), + } + } +} + +impl Service> for RpcHttp2Server +where + T: RpcServer + 'static, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, +{ + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let path = req.uri().path().to_string(); + let version = req + .headers() + .get("tri-service-version") + .map(|e| String::from_utf8_lossy(e.as_bytes()).to_string()); + let rpc_msg = RpcMsg::new(path, version); + let rpc_unary_server = RpcUnaryServer { + inner: self.inner.clone(), + msg: Some(rpc_msg), + }; + let mut server = TripleServer::new(); + let fut = async move { + let res = server.unary(rpc_unary_server, req).await; + Ok(res) + }; + Box::pin(fut) + } +} + +#[allow(non_camel_case_types)] +struct RpcUnaryServer { + inner: _Inner, + msg: Option, +} + +impl UnarySvc for RpcUnaryServer { + type Response = TripleResponseWrapper; + type Future = BoxFuture, crate::status::Status>; + fn call(&mut self, request: Request) -> Self::Future { + let inner = self.inner.0.clone(); + let mut msg = self.msg.take().unwrap(); + msg.req = request.message.get_req(); + let fut = async move { + let res = inner.invoke(msg).await.res; + match res { + Ok(res) => Ok(Response::new(TripleResponseWrapper::new(res))), + Err(err) => Err(err), + } + }; + Box::pin(fut) + } +} + +impl Clone for RpcHttp2Server { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { inner } + } +} + +impl Clone for _Inner { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } +} diff --git a/dubbo/src/triple/triple_wrapper.rs b/dubbo/src/triple/triple_wrapper.rs new file mode 100644 index 00000000..2f06df69 --- /dev/null +++ b/dubbo/src/triple/triple_wrapper.rs @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use serde::{Deserialize, Serialize}; + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message, Serialize, Deserialize)] +pub struct TripleRequestWrapper { + /// hessian4 + /// json + #[prost(string, tag = "1")] + pub serialize_type: ::prost::alloc::string::String, + #[prost(bytes = "vec", repeated, tag = "2")] + pub args: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, + #[prost(string, repeated, tag = "3")] + pub arg_types: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message, Serialize, Deserialize)] +pub struct TripleResponseWrapper { + #[prost(string, tag = "1")] + pub serialize_type: ::prost::alloc::string::String, + #[prost(bytes = "vec", tag = "2")] + pub data: ::prost::alloc::vec::Vec, + #[prost(string, tag = "3")] + pub r#type: ::prost::alloc::string::String, +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message, Serialize, Deserialize)] +pub struct TripleExceptionWrapper { + #[prost(string, tag = "1")] + pub language: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub serialization: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub class_name: ::prost::alloc::string::String, + #[prost(bytes = "vec", tag = "4")] + pub data: ::prost::alloc::vec::Vec, +} + +impl TripleRequestWrapper { + pub fn new(data: Vec) -> Self { + let mut trip = TripleRequestWrapper::default(); + trip.serialize_type = "fastjson".to_string(); + trip.args = data.iter().map(|e| e.as_bytes().to_vec()).collect(); + return trip; + } + pub fn get_req(self) -> Vec { + let mut res = vec![]; + for str in self.args { + res.push(String::from_utf8(str).unwrap()); + } + return res; + } +} + +impl TripleResponseWrapper { + pub fn new(data: String) -> TripleResponseWrapper { + let mut trip = TripleResponseWrapper::default(); + trip.serialize_type = "fastjson".to_string(); + trip.data = data.as_bytes().to_vec(); + return trip; + } +} diff --git a/examples/echo/src/generated/grpc.examples.echo.rs b/examples/echo/src/generated/grpc.examples.echo.rs index fc48dc5c..ee8cc1e6 100644 --- a/examples/echo/src/generated/grpc.examples.echo.rs +++ b/examples/echo/src/generated/grpc.examples.echo.rs @@ -43,7 +43,9 @@ pub mod echo_client { let invocation = RpcInvocation::default() .with_service_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("UnaryEcho")); - let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho"); + let path = http::uri::PathAndQuery::from_static( + "/grpc.examples.echo.Echo/UnaryEcho", + ); self.inner.unary(request, path, invocation).await } /// ServerStreamingEcho is server side streaming. @@ -100,7 +102,9 @@ pub mod echo_server { request: Request, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the ServerStreamingEcho method. - type ServerStreamingEchoStream: futures_util::Stream> + type ServerStreamingEchoStream: futures_util::Stream< + Item = Result, + > + Send + 'static; /// ServerStreamingEcho is server side streaming. @@ -114,14 +118,19 @@ pub mod echo_server { request: Request>, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the BidirectionalStreamingEcho method. - type BidirectionalStreamingEchoStream: futures_util::Stream> + type BidirectionalStreamingEchoStream: futures_util::Stream< + Item = Result, + > + Send + 'static; /// BidirectionalStreamingEcho is bidi streaming. async fn bidirectional_streaming_echo( &self, request: Request>, - ) -> Result, dubbo::status::Status>; + ) -> Result< + Response, + dubbo::status::Status, + >; } /// Echo is the echo service. #[derive(Debug)] @@ -151,7 +160,10 @@ pub mod echo_server { type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -164,16 +176,24 @@ pub mod echo_server { } impl UnarySvc for UnaryEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture, dubbo::status::Status>; - fn call(&mut self, request: Request) -> Self::Future { + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; + fn call( + &mut self, + request: Request, + ) -> Self::Future { let inner = self.inner.0.clone(); let fut = async move { inner.unary_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = - TripleServer::::new(); + let mut server = TripleServer::< + super::EchoRequest, + super::EchoResponse, + >::new(); let res = server.unary(UnaryEchoServer { inner }, req).await; Ok(res) }; @@ -184,20 +204,30 @@ pub mod echo_server { struct ServerStreamingEchoServer { inner: _Inner, } - impl ServerStreamingSvc for ServerStreamingEchoServer { + impl ServerStreamingSvc + for ServerStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::ServerStreamingEchoStream; - type Future = - BoxFuture, dubbo::status::Status>; - fn call(&mut self, request: Request) -> Self::Future { + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; + fn call( + &mut self, + request: Request, + ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { inner.server_streaming_echo(request).await }; + let fut = async move { + inner.server_streaming_echo(request).await + }; Box::pin(fut) } } let fut = async move { - let mut server = - TripleServer::::new(); + let mut server = TripleServer::< + super::EchoRequest, + super::EchoResponse, + >::new(); let res = server .server_streaming(ServerStreamingEchoServer { inner }, req) .await; @@ -210,21 +240,29 @@ pub mod echo_server { struct ClientStreamingEchoServer { inner: _Inner, } - impl ClientStreamingSvc for ClientStreamingEchoServer { + impl ClientStreamingSvc + for ClientStreamingEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture, dubbo::status::Status>; + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { inner.client_streaming_echo(request).await }; + let fut = async move { + inner.client_streaming_echo(request).await + }; Box::pin(fut) } } let fut = async move { - let mut server = - TripleServer::::new(); + let mut server = TripleServer::< + super::EchoRequest, + super::EchoResponse, + >::new(); let res = server .client_streaming(ClientStreamingEchoServer { inner }, req) .await; @@ -237,39 +275,54 @@ pub mod echo_server { struct BidirectionalStreamingEchoServer { inner: _Inner, } - impl StreamingSvc for BidirectionalStreamingEchoServer { + impl StreamingSvc + for BidirectionalStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::BidirectionalStreamingEchoStream; - type Future = - BoxFuture, dubbo::status::Status>; + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = - async move { inner.bidirectional_streaming_echo(request).await }; + let fut = async move { + inner.bidirectional_streaming_echo(request).await + }; Box::pin(fut) } } let fut = async move { - let mut server = - TripleServer::::new(); + let mut server = TripleServer::< + super::EchoRequest, + super::EchoResponse, + >::new(); let res = server - .bidi_streaming(BidirectionalStreamingEchoServer { inner }, req) + .bidi_streaming( + BidirectionalStreamingEchoServer { + inner, + }, + req, + ) .await; Ok(res) }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } diff --git a/examples/interface/Cargo.toml b/examples/interface/Cargo.toml new file mode 100644 index 00000000..2e47a4a1 --- /dev/null +++ b/examples/interface/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "example-interface" +version = "0.3.0" +edition = "2021" +license = "Apache-2.0" +description = "dubbo-rust-examples-interface" +repository = "https://github.com/apache/dubbo-rust.git" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[package.metadata.release] +release = false + +[[bin]] +name = "interface-server" +path = "src/server.rs" + +[[bin]] +name = "interface-client" +path = "src/client.rs" + +[dependencies] +http = "0.2" +http-body = "0.4.4" +futures-util = {version = "0.3", default-features = false} +tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net", "signal"] } +prost-derive = {version = "0.10", optional = true} +prost = "0.10.4" +async-trait = "0.1.56" +tokio-stream = "0.1" +dubbo-macro = { path = "../../dubbo-macro", version = "0.3.0" } +dubbo = { path = "../../dubbo"} +registry-zookeeper.workspace = true +registry-nacos.workspace = true +serde = { version = "1.0.196", features = ["derive"] } +serde_json = "1" +url = "2.5.0" diff --git a/examples/interface/LICENSE b/examples/interface/LICENSE new file mode 100644 index 00000000..75b52484 --- /dev/null +++ b/examples/interface/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/examples/interface/README.md b/examples/interface/README.md new file mode 100644 index 00000000..175c55fb --- /dev/null +++ b/examples/interface/README.md @@ -0,0 +1,22 @@ +# Apache Dubbo-rust example - interface + +## build and run + +```sh +$ cd github.com/apache/dubbo-rust/examples/interface/ +$ cargo build + +$ # run sever +$ ../../target/debug/interface-server + +$ # run client +$ ../../target/debug/interface-client + +# client stream +server response : Ok("Hello world1") +server response : Ok(ResDto { str: "Hello world2:world3 V2" }) + +# server stream +client request : "world1" +client request : ReqDto { str: "world2" } : ReqDto { str: "world3" } +``` diff --git a/examples/interface/README_CN.md b/examples/interface/README_CN.md new file mode 100644 index 00000000..c8e87ab9 --- /dev/null +++ b/examples/interface/README_CN.md @@ -0,0 +1,21 @@ +# Apache Dubbo-rust 示例 - interface + +## 构建并运行 + +```sh +$ cd github.com/apache/dubbo-rust/examples/interface/ +$ cargo build + +$ # run sever +$ ../../target/debug/interface-server + +$ # run client +$ ../../target/debug/interface-client + +# client stream +server response : Ok("Hello world1") +server response : Ok(ResDto { str: "Hello world2:world3 V2" }) + +# server stream +client request : "world1" +client request : ReqDto { str: "world2" } : ReqDto { str: "world3" } diff --git a/examples/interface/application.yaml b/examples/interface/application.yaml new file mode 100644 index 00000000..f6974286 --- /dev/null +++ b/examples/interface/application.yaml @@ -0,0 +1,25 @@ +logging: + level: debug +dubbo: + protocols: + triple: + ip: 0.0.0.0 + port: '8888' + name: tri + registries: + demoZK: + protocol: zookeeper + address: 0.0.0.0:2181 + provider: + services: + DemoServiceImpl: + version: 1.0.0 + group: test + protocol: triple + serialization : fastjson + interface: org.apache.dubbo.springboot.demo.DemoService + routers: + consumer: + - service: "org.apache.dubbo.springboot.demo.DemoService" + url: tri://127.0.0.1:20000 + protocol: triple \ No newline at end of file diff --git a/examples/interface/src/client.rs b/examples/interface/src/client.rs new file mode 100644 index 00000000..622beeff --- /dev/null +++ b/examples/interface/src/client.rs @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use dubbo::codegen::{ClientBuilder}; +use dubbo::extension; +use example_interface::{DemoServiceClient, ReqDto}; +use registry_nacos::NacosRegistry; + +#[tokio::main] +async fn main() { + dubbo::logger::init(); + let _ = extension::EXTENSIONS.register::().await; + let builder = ClientBuilder::new().with_registry("nacos://127.0.0.1:8848".parse().unwrap()); + let mut client = DemoServiceClient::new(builder); + let res = client.sayHello("world1".to_string()).await; + println!("server response : {:?}", res); + let res = client + .sayHelloV2( + ReqDto { + str: "world2".to_string(), + }, + ReqDto { + str: "world3".to_string(), + }, + ) + .await; + println!("server response : {:?}", res); +} diff --git a/examples/interface/src/lib.rs b/examples/interface/src/lib.rs new file mode 100644 index 00000000..bf05fb07 --- /dev/null +++ b/examples/interface/src/lib.rs @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use dubbo_macro::dubbo_trait; +use serde::{Deserialize, Serialize}; + + +#[derive(Serialize, Deserialize, Default, Debug)] +pub struct ReqDto { + pub str: String, +} + +#[derive(Serialize, Deserialize, Default, Debug)] +pub struct ResDto { + pub str: String, +} + +#[dubbo_trait(package = "org.apache.dubbo.springboot.demo")] +pub trait DemoService { + async fn sayHello(&self, name: String) -> String; + + async fn sayHelloV2(&self, name: ReqDto, name2: ReqDto) -> ResDto; +} diff --git a/examples/interface/src/server.rs b/examples/interface/src/server.rs new file mode 100644 index 00000000..ce5dd35c --- /dev/null +++ b/examples/interface/src/server.rs @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use dubbo::{Dubbo, extension}; +use dubbo_macro::dubbo_server; +use example_interface::{DemoService, ReqDto, ResDto}; +use registry_zookeeper::ZookeeperRegistry; +use std::env; +use dubbo::config::RootConfig; +use dubbo::logger::tracing::span; +use registry_nacos::NacosRegistry; + +#[derive(Clone)] +struct DemoServiceImpl { + _db: String, +} + +#[dubbo_server(package = "org.apache.dubbo.springboot.demo")] +impl DemoService for DemoServiceImpl { + async fn sayHello(&self, req: String) -> Result { + println!("client request : {:?}", req); + return Ok("Hello ".to_owned() + &req); + } + async fn sayHelloV2(&self, req: ReqDto, req2: ReqDto) -> Result { + println!("client request : {:?} : {:?}", req, req2); + return Ok(ResDto { + str: "Hello ".to_owned() + &req.str + ":" + &req2.str + " V2", + }); + } +} + +#[tokio::main] +async fn main() { + dubbo::logger::init(); + let span = span!(Level::DEBUG, "greeter.server"); + let _enter = span.enter(); + register_server(GreeterServerImpl { + name: "greeter".to_string(), + }); + // let zkr: ZookeeperRegistry = ZookeeperRegistry::default(); + let r = RootConfig::new(); + let r = match r.load() { + Ok(config) => config, + Err(_err) => panic!("err: {:?}", _err), // response was droped + }; + + let _ = extension::EXTENSIONS.register::().await; + let mut f = Dubbo::new() + .with_config(r) + .add_registry("nacos://127.0.0.1:8848/"); + + f.start().await; +} From 8c86537945a7213375b3b5d9f9344f365c49f678 Mon Sep 17 00:00:00 2001 From: kwsc98 <958280102@qq.com> Date: Tue, 14 May 2024 17:12:04 +0800 Subject: [PATCH 2/4] 20240514 --- dubbo-macro/src/lib.rs | 28 ++-- dubbo-macro/src/server_macro.rs | 35 ++--- dubbo-macro/src/trait_macro.rs | 44 +++--- dubbo/src/config/service.rs | 7 + dubbo/src/framework.rs | 15 ++- dubbo/src/invocation.rs | 26 +++- dubbo/src/protocol/triple/mod.rs | 2 +- dubbo/src/status.rs | 44 +++--- dubbo/src/triple/client/triple.rs | 26 +++- dubbo/src/triple/server/mod.rs | 2 +- dubbo/src/triple/server/support.rs | 35 ++--- dubbo/src/triple/triple_wrapper.rs | 2 +- .../echo/src/generated/grpc.examples.echo.rs | 127 +++++------------- examples/interface/application.yaml | 7 +- examples/interface/src/client.rs | 3 +- examples/interface/src/lib.rs | 1 - examples/interface/src/server.rs | 34 ++--- registry/nacos/src/lib.rs | 3 +- 18 files changed, 215 insertions(+), 226 deletions(-) diff --git a/dubbo-macro/src/lib.rs b/dubbo-macro/src/lib.rs index 6db902a9..7656d610 100644 --- a/dubbo-macro/src/lib.rs +++ b/dubbo-macro/src/lib.rs @@ -16,20 +16,18 @@ */ use proc_macro::TokenStream; -use quote::{ToTokens}; +use quote::ToTokens; use syn::parse::Parser; mod server_macro; mod trait_macro; - - #[proc_macro_attribute] pub fn dubbo_trait(attr: TokenStream, item: TokenStream) -> TokenStream { let attr = DubboAttr::from_attr(attr); match attr { - Ok(attr) => { trait_macro::dubbo_trait(attr, item) } - Err(err) => { err.into_compile_error().into() } + Ok(attr) => trait_macro::dubbo_trait(attr, item), + Err(err) => err.into_compile_error().into(), } } @@ -37,12 +35,11 @@ pub fn dubbo_trait(attr: TokenStream, item: TokenStream) -> TokenStream { pub fn dubbo_server(attr: TokenStream, item: TokenStream) -> TokenStream { let attr = DubboAttr::from_attr(attr); match attr { - Ok(attr) => { server_macro::dubbo_server(attr, item) } - Err(err) => { err.into_compile_error().into() } + Ok(attr) => server_macro::dubbo_server(attr, item), + Err(err) => err.into_compile_error().into(), } } - #[derive(Default)] struct DubboAttr { package: Option, @@ -56,7 +53,9 @@ impl DubboAttr { .and_then(|args| Self::build_attr(args)) } - fn build_attr(args: syn::punctuated::Punctuated::) -> Result { + fn build_attr( + args: syn::punctuated::Punctuated, + ) -> Result { let mut package = None; let mut version = None; for arg in args { @@ -71,10 +70,12 @@ impl DubboAttr { .to_string() .to_lowercase(); let lit = match &namevalue.value { - syn::Expr::Lit(syn::ExprLit { lit, .. }) => lit.to_token_stream().to_string(), + syn::Expr::Lit(syn::ExprLit { lit, .. }) => { + lit.to_token_stream().to_string() + } expr => expr.to_token_stream().to_string(), } - .replace("\"", ""); + .replace("\"", ""); match ident.as_str() { "package" => { let _ = package.insert(lit); @@ -99,9 +100,6 @@ impl DubboAttr { } } } - Ok(DubboAttr { - package, - version, - }) + Ok(DubboAttr { package, version }) } } diff --git a/dubbo-macro/src/server_macro.rs b/dubbo-macro/src/server_macro.rs index f07fda7d..1aee33ab 100644 --- a/dubbo-macro/src/server_macro.rs +++ b/dubbo-macro/src/server_macro.rs @@ -15,11 +15,10 @@ * limitations under the License. */ - use crate::DubboAttr; use proc_macro::TokenStream; use quote::{quote, ToTokens}; -use syn::{parse_macro_input, FnArg, ItemImpl, ImplItem}; +use syn::{parse_macro_input, FnArg, ImplItem, ItemImpl}; pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream { let version = match attr.version { @@ -48,7 +47,7 @@ pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream { let token = quote! { let result : Result<#req_type,_> = serde_json::from_slice(param_req[idx].as_bytes()); if let Err(err) = result { - param.res = Err(dubbo::status::Status::new(dubbo::status::Code::InvalidArgument,err.to_string())); + param.result = Err(dubbo::status::Status::new(dubbo::status::Code::InvalidArgument,err.to_string())); return param; } let #req : #req_type = result.unwrap(); @@ -62,7 +61,7 @@ pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream { ); vec.push(quote! { if ¶m.method_name[..] == stringify!(#method) { - let param_req = ¶m.req; + let param_req = ¶m.args; let mut idx = 0; #( #req @@ -72,7 +71,7 @@ pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream { #req_pat, )* ).await; - param.res = match res { + param.result = match res { Ok(res) => { let res = serde_json::to_string(&res).unwrap(); Ok(res) @@ -87,20 +86,23 @@ pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream { vec }); let service_unique = match &attr.package { - None => { quote!(stringify!(#item_trait)) } - Some(attr) => { quote!(#attr.to_owned() + "." + stringify!(#item_trait)) } + None => { + quote!(stringify!(#item_trait)) + } + Some(attr) => { + let service_unique = attr.to_owned() + "." + &item_trait.to_string(); + quote!(&#service_unique) + } }; let expanded = quote! { + #server_item - use dubbo::triple::server::support::RpcServer; - use dubbo::triple::server::support::RpcFuture; - use dubbo::triple::server::support::RpcMsg; - impl RpcServer for #item_self { - fn invoke (&self, param : RpcMsg) -> RpcFuture { + impl dubbo::triple::server::support::RpcServer for #item_self { + fn invoke (&self, param : dubbo::triple::server::support::RpcContext) -> dubbo::triple::server::support::RpcFuture { let mut rpc = self.clone(); Box::pin(async move {rpc.prv_invoke(param).await}) } - fn get_info(&self) -> (&str , &str , Option<&str> , Vec) { + fn get_info(&self) -> (&str, Option<&str>, Vec) { let mut methods = vec![]; #( methods.push(stringify!(#items_ident_fn).to_string()); @@ -110,9 +112,9 @@ pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream { } impl #item_self { - async fn prv_invoke (&self, mut param : RpcMsg) -> RpcMsg { + async fn prv_invoke (&self, mut param : dubbo::triple::server::support::RpcContext) -> dubbo::triple::server::support::RpcContext { #(#items_fn)* - param.res = Err( + param.result = Err( dubbo::status::Status::new(dubbo::status::Code::NotFound,format!("not find method by {}",param.method_name)) ); return param; @@ -122,7 +124,6 @@ pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream { expanded.into() } - fn get_server_item(item: ItemImpl) -> proc_macro2::TokenStream { let impl_item = item.impl_token; let trait_ident = item.trait_.unwrap().1; @@ -141,4 +142,4 @@ fn get_server_item(item: ItemImpl) -> proc_macro2::TokenStream { )* } } -} \ No newline at end of file +} diff --git a/dubbo-macro/src/trait_macro.rs b/dubbo-macro/src/trait_macro.rs index 82f11176..d4aa95e8 100644 --- a/dubbo-macro/src/trait_macro.rs +++ b/dubbo-macro/src/trait_macro.rs @@ -64,11 +64,12 @@ pub fn dubbo_trait(attr: DubboAttr, item: TokenStream) -> TokenStream { vec.push(token); vec }); - let package = stringify!(#trait_ident); + let package = trait_ident.to_string(); let service_unique = match &attr.package { - None => { quote!(#package) } - Some(attr) => { quote!(#attr.to_owned() + "." + #package) } + None => package.to_owned(), + Some(attr) => attr.to_owned() + "." + &package, }; + let path = "/".to_string() + &service_unique + "/" + &ident.to_string(); fn_quote.push( quote! { #[allow(non_snake_case)] @@ -82,22 +83,26 @@ pub fn dubbo_trait(attr: DubboAttr, item: TokenStream) -> TokenStream { req_vec.push(req_poi_str.unwrap()); )* let _version : Option<&str> = #version; - let request = Request::new(TripleRequestWrapper::new(req_vec)); + let request = dubbo::invocation::Request::new(dubbo::triple::triple_wrapper::TripleRequestWrapper::new(req_vec)); let service_unique = #service_unique; let method_name = stringify!(#ident).to_string(); let invocation = dubbo::invocation::RpcInvocation::default() .with_service_unique_name(service_unique.to_owned()) .with_method_name(method_name.clone()); - let path = "/".to_string() + service_unique + "/" + &method_name; - let path = http::uri::PathAndQuery::from_str( - &path, - ).unwrap(); - let res = self.inner.unary::(request, path, invocation).await; + let path = http::uri::PathAndQuery::from_static( + #path, + ); + let res = self.inner.unary::(request, path, invocation).await; match res { Ok(res) => { let response_wrapper = res.into_parts().1; - let res: #output_type = serde_json::from_slice(&response_wrapper.data).unwrap(); - Ok(res) + let data = &response_wrapper.data; + if data.starts_with(b"null") { + Err(dubbo::status::Status::new(dubbo::status::Code::DataLoss,"null".to_string())) + } else { + let res: #output_type = serde_json::from_slice(data).unwrap(); + Ok(res) + } }, Err(err) => Err(err) } @@ -107,33 +112,24 @@ pub fn dubbo_trait(attr: DubboAttr, item: TokenStream) -> TokenStream { } let rpc_client = syn::Ident::new(&format!("{}Client", trait_ident), trait_ident.span()); let expanded = quote! { - use dubbo::triple::client::TripleClient; - use dubbo::triple::triple_wrapper::TripleRequestWrapper; - use dubbo::triple::triple_wrapper::TripleResponseWrapper; - use dubbo::triple::codec::prost::ProstCodec; - use dubbo::invocation::Request; - use dubbo::invocation::Response; - use dubbo::triple::client::builder::ClientBuilder; - use std::str::FromStr; #item_trait #vis struct #rpc_client { - inner: TripleClient + inner: dubbo::triple::client::TripleClient } impl #rpc_client { #( #fn_quote )* - pub fn new(builder: ClientBuilder) -> #rpc_client { - #rpc_client {inner: TripleClient::new(builder),} + pub fn new(builder: dubbo::triple::client::builder::ClientBuilder) -> #rpc_client { + #rpc_client {inner: dubbo::triple::client::TripleClient::new(builder),} } } }; TokenStream::from(expanded) } - fn get_item_trait(item: ItemTrait) -> proc_macro2::TokenStream { let trait_ident = &item.ident; let item_fn = item.items.iter().fold(vec![], |mut vec, e| { @@ -162,4 +158,4 @@ fn get_item_trait(item: ItemTrait) -> proc_macro2::TokenStream { )* } } -} \ No newline at end of file +} diff --git a/dubbo/src/config/service.rs b/dubbo/src/config/service.rs index 8a1f1910..ce639f46 100644 --- a/dubbo/src/config/service.rs +++ b/dubbo/src/config/service.rs @@ -23,6 +23,7 @@ pub struct ServiceConfig { pub group: String, pub protocol: String, pub interface: String, + pub serialization: Option, } impl ServiceConfig { @@ -41,4 +42,10 @@ impl ServiceConfig { pub fn protocol(self, protocol: String) -> Self { Self { protocol, ..self } } + pub fn serialization(self, serialization: Option) -> Self { + Self { + serialization, + ..self + } + } } diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs index 6bb6e3b1..5bafb629 100644 --- a/dubbo/src/framework.rs +++ b/dubbo/src/framework.rs @@ -24,10 +24,10 @@ use crate::{ logger::tracing::{debug, info}, protocol::{BoxExporter, Protocol}, registry::protocol::RegistryProtocol, + triple::server::support::{RpcHttp2Server, RpcServer}, Url, }; use futures::{future, Future}; -use crate::triple::server::support::{RpcHttp2Server, RpcServer}; // Invoker是否可以基于hyper写一个通用的 @@ -93,12 +93,17 @@ impl Dubbo { .protocols .get_protocol_or_default(service_config.protocol.as_str()); let interface_name = service_config.interface.clone(); - let protocol_url = format!( - "{}/{}?interface={}", + let mut protocol_url = format!( + "{}/{}?interface={}&category={}&protocol={}", protocol.to_url(), interface_name, - interface_name + interface_name, + "providers", + "tri" ); + if let Some(serialization) = &service_config.serialization { + protocol_url.push_str(&format!("&prefer.serialization={}", serialization)); + } info!("protocol_url: {:?}", protocol_url); protocol_url.parse().ok() } else { @@ -142,7 +147,7 @@ impl Dubbo { .with_registries(registry_extensions.clone()) .with_services(self.service_registry.clone()), ); - let mut async_vec: Vec + Send>>> = Vec::new(); + let mut async_vec: Vec + Send>>> = Vec::new(); for (name, items) in self.protocols.iter() { for url in items.iter() { info!("base: {:?}, service url: {:?}", name, url); diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs index 7b251835..2d1c91db 100644 --- a/dubbo/src/invocation.rs +++ b/dubbo/src/invocation.rs @@ -18,6 +18,7 @@ use std::{collections::HashMap, fmt::Debug, str::FromStr}; use futures_core::Stream; +use http::StatusCode; pub struct Request { pub message: T, @@ -81,6 +82,7 @@ impl Request { pub struct Response { message: T, + status: StatusCode, metadata: Metadata, } @@ -88,30 +90,41 @@ impl Response { pub fn new(message: T) -> Response { Self { message, + status: Default::default(), metadata: Metadata::new(), } } pub fn from_parts(metadata: Metadata, message: T) -> Self { - Self { message, metadata } + Self { + message, + status: Default::default(), + metadata, + } } pub fn into_parts(self) -> (Metadata, T) { - (self.metadata, self.message) + let metadata = self + .metadata + .insert("http_status".to_owned(), self.status.as_str().to_owned()); + (metadata, self.message) } pub fn into_http(self) -> http::Response { let mut http_resp = http::Response::new(self.message); *http_resp.version_mut() = http::Version::HTTP_2; *http_resp.headers_mut() = self.metadata.into_headers(); + *http_resp.status_mut() = self.status; http_resp } pub fn from_http(resp: http::Response) -> Self { + let status = resp.status(); let (part, body) = resp.into_parts(); Response { message: body, + status, metadata: Metadata::from_headers(part.headers), } } @@ -123,6 +136,7 @@ impl Response { let u = f(self.message); Response { message: u, + status: self.status, metadata: self.metadata, } } @@ -206,6 +220,14 @@ impl Metadata { header } + + pub fn get(&self, key: &str) -> Option<&String> { + self.inner.get(key) + } + + pub fn get_http_status(&self) -> &str { + self.inner.get("http_status").map_or("200", |e| e) + } } pub trait Invocation { diff --git a/dubbo/src/protocol/triple/mod.rs b/dubbo/src/protocol/triple/mod.rs index 9b2ae811..e97b1505 100644 --- a/dubbo/src/protocol/triple/mod.rs +++ b/dubbo/src/protocol/triple/mod.rs @@ -26,7 +26,7 @@ use std::{collections::HashMap, sync::RwLock}; use crate::{utils::boxed_clone::BoxCloneService, BoxBody}; pub type GrpcBoxCloneService = -BoxCloneService, http::Response, std::convert::Infallible>; + BoxCloneService, http::Response, std::convert::Infallible>; lazy_static! { pub static ref TRIPLE_SERVICES: RwLock> = diff --git a/dubbo/src/status.rs b/dubbo/src/status.rs index 7258b481..6ba1076a 100644 --- a/dubbo/src/status.rs +++ b/dubbo/src/status.rs @@ -225,32 +225,38 @@ impl std::fmt::Display for Code { } } -impl From for Code { - fn from(i: i32) -> Self { +impl From<&[u8]> for Code { + fn from(i: &[u8]) -> Self { match i { - 0 => Code::Ok, - 1 => Code::Cancelled, - 2 => Code::Unknown, - 3 => Code::InvalidArgument, - 4 => Code::DeadlineExceeded, - 5 => Code::NotFound, - 6 => Code::AlreadyExists, - 7 => Code::PermissionDenied, - 8 => Code::ResourceExhausted, - 9 => Code::FailedPrecondition, - 10 => Code::Aborted, - 11 => Code::OutOfRange, - 12 => Code::Unimplemented, - 13 => Code::Internal, - 14 => Code::Unavailable, - 15 => Code::DataLoss, - 16 => Code::Unauthenticated, + b"0" => Code::Ok, + b"1" => Code::Cancelled, + b"2" => Code::Unknown, + b"3" => Code::InvalidArgument, + b"4" => Code::DeadlineExceeded, + b"5" => Code::NotFound, + b"6" => Code::AlreadyExists, + b"7" => Code::PermissionDenied, + b"8" => Code::ResourceExhausted, + b"9" => Code::FailedPrecondition, + b"10" => Code::Aborted, + b"11" => Code::OutOfRange, + b"12" => Code::Unimplemented, + b"13" => Code::Internal, + b"14" => Code::Unavailable, + b"15" => Code::DataLoss, + b"16" => Code::Unauthenticated, _ => Code::Unknown, } } } +impl From for Code { + fn from(i: i32) -> Self { + Code::from(i.to_string().as_bytes()) + } +} + #[derive(Debug, Clone)] pub struct Status { // grpc-status diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index 2948dec5..55e6d5ad 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -21,6 +21,7 @@ use http::HeaderValue; use prost::Message; use serde::{Deserialize, Serialize}; use tower_service::Service; +use tracing::error; use crate::codegen::{ProstCodec, RpcInvocation, SerdeCodec}; @@ -167,7 +168,6 @@ impl TripleClient { .header("path", path.to_string()) .body(body) .unwrap(); - for (k, v) in mt.into_headers().iter() { request.headers_mut().insert(k, v.to_owned()); } @@ -185,12 +185,24 @@ impl TripleClient { futures_util::pin_mut!(body); - let message = body.try_next().await?.ok_or_else(|| { - crate::status::Status::new( - crate::status::Code::Internal, - "Missing response message.".to_string(), - ) - })?; + let message = match body.try_next().await? { + Some(message) => message, + None => { + let http_status = parts.get_http_status(); + if http_status != "200" { + error!("http status : {}", http_status); + } + let code = parts + .get("grpc-status") + .map(|e| crate::status::Code::from(e.as_bytes())) + .map_or(crate::status::Code::Internal, |e| e); + let message = parts + .get("grpc-message") + .map(|e| e.to_string()) + .map_or(code.to_string(), |e| e); + return Err(crate::status::Status::new(code, message)); + } + }; if let Some(trailers) = body.trailer().await? { let mut h = parts.into_headers(); diff --git a/dubbo/src/triple/server/mod.rs b/dubbo/src/triple/server/mod.rs index abd48f91..a5cf0cf0 100644 --- a/dubbo/src/triple/server/mod.rs +++ b/dubbo/src/triple/server/mod.rs @@ -17,7 +17,7 @@ pub mod builder; pub mod service; -pub mod triple; pub mod support; +pub mod triple; pub use triple::TripleServer; diff --git a/dubbo/src/triple/server/support.rs b/dubbo/src/triple/server/support.rs index 896338c1..d368964b 100644 --- a/dubbo/src/triple/server/support.rs +++ b/dubbo/src/triple/server/support.rs @@ -33,29 +33,30 @@ use super::TripleServer; pub type RpcFuture = std::pin::Pin + Send>>; -pub struct RpcMsg { +#[derive(Debug)] +pub struct RpcContext { pub version: Option, pub class_name: String, pub method_name: String, - pub req: Vec, - pub res: Result, + pub args: Vec, + pub result: Result, } -impl RpcMsg { +impl RpcContext { pub fn new(path: String, version: Option) -> Self { let attr: Vec<&str> = path.split("/").collect(); - RpcMsg { + RpcContext { version, class_name: attr[1].to_string(), method_name: attr[2].to_string(), - req: vec![], - res: Err(Status::new(Code::Ok, "success".to_string())), + args: vec![], + result: Err(Status::new(Code::Ok, "success".to_string())), } } } pub trait RpcServer: Send + Sync + 'static { - fn invoke(&self, msg: RpcMsg) -> RpcFuture; + fn invoke(&self, msg: RpcContext) -> RpcFuture; fn get_info(&self) -> (&str, Option<&str>, Vec); } @@ -86,20 +87,20 @@ where fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn call(&mut self, req: http::Request) -> Self::Future { - let path = req.uri().path().to_string(); - let version = req + fn call(&mut self, request: http::Request) -> Self::Future { + let path = request.uri().path().to_string(); + let version = request .headers() .get("tri-service-version") .map(|e| String::from_utf8_lossy(e.as_bytes()).to_string()); - let rpc_msg = RpcMsg::new(path, version); + let rpc_msg = RpcContext::new(path, version); let rpc_unary_server = RpcUnaryServer { inner: self.inner.clone(), msg: Some(rpc_msg), }; let mut server = TripleServer::new(); let fut = async move { - let res = server.unary(rpc_unary_server, req).await; + let res = server.unary(rpc_unary_server, request).await; Ok(res) }; Box::pin(fut) @@ -109,7 +110,7 @@ where #[allow(non_camel_case_types)] struct RpcUnaryServer { inner: _Inner, - msg: Option, + msg: Option, } impl UnarySvc for RpcUnaryServer { @@ -118,10 +119,10 @@ impl UnarySvc for RpcUnaryServer { fn call(&mut self, request: Request) -> Self::Future { let inner = self.inner.0.clone(); let mut msg = self.msg.take().unwrap(); - msg.req = request.message.get_req(); + msg.args = request.message.get_args(); let fut = async move { - let res = inner.invoke(msg).await.res; - match res { + let result = inner.invoke(msg).await.result; + match result { Ok(res) => Ok(Response::new(TripleResponseWrapper::new(res))), Err(err) => Err(err), } diff --git a/dubbo/src/triple/triple_wrapper.rs b/dubbo/src/triple/triple_wrapper.rs index 2f06df69..ce1d7d7b 100644 --- a/dubbo/src/triple/triple_wrapper.rs +++ b/dubbo/src/triple/triple_wrapper.rs @@ -61,7 +61,7 @@ impl TripleRequestWrapper { trip.args = data.iter().map(|e| e.as_bytes().to_vec()).collect(); return trip; } - pub fn get_req(self) -> Vec { + pub fn get_args(self) -> Vec { let mut res = vec![]; for str in self.args { res.push(String::from_utf8(str).unwrap()); diff --git a/examples/echo/src/generated/grpc.examples.echo.rs b/examples/echo/src/generated/grpc.examples.echo.rs index ee8cc1e6..fc48dc5c 100644 --- a/examples/echo/src/generated/grpc.examples.echo.rs +++ b/examples/echo/src/generated/grpc.examples.echo.rs @@ -43,9 +43,7 @@ pub mod echo_client { let invocation = RpcInvocation::default() .with_service_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("UnaryEcho")); - let path = http::uri::PathAndQuery::from_static( - "/grpc.examples.echo.Echo/UnaryEcho", - ); + let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho"); self.inner.unary(request, path, invocation).await } /// ServerStreamingEcho is server side streaming. @@ -102,9 +100,7 @@ pub mod echo_server { request: Request, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the ServerStreamingEcho method. - type ServerStreamingEchoStream: futures_util::Stream< - Item = Result, - > + type ServerStreamingEchoStream: futures_util::Stream> + Send + 'static; /// ServerStreamingEcho is server side streaming. @@ -118,19 +114,14 @@ pub mod echo_server { request: Request>, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the BidirectionalStreamingEcho method. - type BidirectionalStreamingEchoStream: futures_util::Stream< - Item = Result, - > + type BidirectionalStreamingEchoStream: futures_util::Stream> + Send + 'static; /// BidirectionalStreamingEcho is bidi streaming. async fn bidirectional_streaming_echo( &self, request: Request>, - ) -> Result< - Response, - dubbo::status::Status, - >; + ) -> Result, dubbo::status::Status>; } /// Echo is the echo service. #[derive(Debug)] @@ -160,10 +151,7 @@ pub mod echo_server { type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -176,24 +164,16 @@ pub mod echo_server { } impl UnarySvc for UnaryEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; - fn call( - &mut self, - request: Request, - ) -> Self::Future { + type Future = BoxFuture, dubbo::status::Status>; + fn call(&mut self, request: Request) -> Self::Future { let inner = self.inner.0.clone(); let fut = async move { inner.unary_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::< - super::EchoRequest, - super::EchoResponse, - >::new(); + let mut server = + TripleServer::::new(); let res = server.unary(UnaryEchoServer { inner }, req).await; Ok(res) }; @@ -204,30 +184,20 @@ pub mod echo_server { struct ServerStreamingEchoServer { inner: _Inner, } - impl ServerStreamingSvc - for ServerStreamingEchoServer { + impl ServerStreamingSvc for ServerStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::ServerStreamingEchoStream; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; - fn call( - &mut self, - request: Request, - ) -> Self::Future { + type Future = + BoxFuture, dubbo::status::Status>; + fn call(&mut self, request: Request) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { - inner.server_streaming_echo(request).await - }; + let fut = async move { inner.server_streaming_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::< - super::EchoRequest, - super::EchoResponse, - >::new(); + let mut server = + TripleServer::::new(); let res = server .server_streaming(ServerStreamingEchoServer { inner }, req) .await; @@ -240,29 +210,21 @@ pub mod echo_server { struct ClientStreamingEchoServer { inner: _Inner, } - impl ClientStreamingSvc - for ClientStreamingEchoServer { + impl ClientStreamingSvc for ClientStreamingEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; + type Future = BoxFuture, dubbo::status::Status>; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { - inner.client_streaming_echo(request).await - }; + let fut = async move { inner.client_streaming_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::< - super::EchoRequest, - super::EchoResponse, - >::new(); + let mut server = + TripleServer::::new(); let res = server .client_streaming(ClientStreamingEchoServer { inner }, req) .await; @@ -275,54 +237,39 @@ pub mod echo_server { struct BidirectionalStreamingEchoServer { inner: _Inner, } - impl StreamingSvc - for BidirectionalStreamingEchoServer { + impl StreamingSvc for BidirectionalStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::BidirectionalStreamingEchoStream; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; + type Future = + BoxFuture, dubbo::status::Status>; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { - inner.bidirectional_streaming_echo(request).await - }; + let fut = + async move { inner.bidirectional_streaming_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::< - super::EchoRequest, - super::EchoResponse, - >::new(); + let mut server = + TripleServer::::new(); let res = server - .bidi_streaming( - BidirectionalStreamingEchoServer { - inner, - }, - req, - ) + .bidi_streaming(BidirectionalStreamingEchoServer { inner }, req) .await; Ok(res) }; Box::pin(fut) } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), } } } diff --git a/examples/interface/application.yaml b/examples/interface/application.yaml index f6974286..6e819326 100644 --- a/examples/interface/application.yaml +++ b/examples/interface/application.yaml @@ -17,9 +17,4 @@ dubbo: group: test protocol: triple serialization : fastjson - interface: org.apache.dubbo.springboot.demo.DemoService - routers: - consumer: - - service: "org.apache.dubbo.springboot.demo.DemoService" - url: tri://127.0.0.1:20000 - protocol: triple \ No newline at end of file + interface: org.apache.dubbo.springboot.demo.DemoService \ No newline at end of file diff --git a/examples/interface/src/client.rs b/examples/interface/src/client.rs index 622beeff..8ec03eb2 100644 --- a/examples/interface/src/client.rs +++ b/examples/interface/src/client.rs @@ -15,8 +15,7 @@ * limitations under the License. */ -use dubbo::codegen::{ClientBuilder}; -use dubbo::extension; +use dubbo::{codegen::ClientBuilder, extension}; use example_interface::{DemoServiceClient, ReqDto}; use registry_nacos::NacosRegistry; diff --git a/examples/interface/src/lib.rs b/examples/interface/src/lib.rs index bf05fb07..0d4279f5 100644 --- a/examples/interface/src/lib.rs +++ b/examples/interface/src/lib.rs @@ -18,7 +18,6 @@ use dubbo_macro::dubbo_trait; use serde::{Deserialize, Serialize}; - #[derive(Serialize, Deserialize, Default, Debug)] pub struct ReqDto { pub str: String, diff --git a/examples/interface/src/server.rs b/examples/interface/src/server.rs index ce5dd35c..6b71fec4 100644 --- a/examples/interface/src/server.rs +++ b/examples/interface/src/server.rs @@ -15,13 +15,16 @@ * limitations under the License. */ -use dubbo::{Dubbo, extension}; +use std::env; + +use dubbo::{ + config::RootConfig, + extension, + logger::{tracing::span, Level}, + Dubbo, +}; use dubbo_macro::dubbo_server; use example_interface::{DemoService, ReqDto, ResDto}; -use registry_zookeeper::ZookeeperRegistry; -use std::env; -use dubbo::config::RootConfig; -use dubbo::logger::tracing::span; use registry_nacos::NacosRegistry; #[derive(Clone)] @@ -33,35 +36,34 @@ struct DemoServiceImpl { impl DemoService for DemoServiceImpl { async fn sayHello(&self, req: String) -> Result { println!("client request : {:?}", req); - return Ok("Hello ".to_owned() + &req); + Ok("Hello ".to_owned() + &req) } async fn sayHelloV2(&self, req: ReqDto, req2: ReqDto) -> Result { println!("client request : {:?} : {:?}", req, req2); - return Ok(ResDto { + Ok(ResDto { str: "Hello ".to_owned() + &req.str + ":" + &req2.str + " V2", - }); + }) } } #[tokio::main] async fn main() { dubbo::logger::init(); - let span = span!(Level::DEBUG, "greeter.server"); + let span = span!(Level::DEBUG, "interface.server"); + env::set_var("DUBBO_CONFIG_PATH", "examples/interface/application.yaml"); let _enter = span.enter(); - register_server(GreeterServerImpl { - name: "greeter".to_string(), - }); - // let zkr: ZookeeperRegistry = ZookeeperRegistry::default(); let r = RootConfig::new(); let r = match r.load() { Ok(config) => config, Err(_err) => panic!("err: {:?}", _err), // response was droped }; - let _ = extension::EXTENSIONS.register::().await; + let server = DemoServiceImpl { + _db: "i am db".to_owned(), + }; let mut f = Dubbo::new() .with_config(r) - .add_registry("nacos://127.0.0.1:8848/"); - + .add_registry("nacos://127.0.0.1:8848/") + .register_server(server); f.start().await; } diff --git a/registry/nacos/src/lib.rs b/registry/nacos/src/lib.rs index f8ccc967..3a429de8 100644 --- a/registry/nacos/src/lib.rs +++ b/registry/nacos/src/lib.rs @@ -378,8 +378,7 @@ impl NacosServiceName { let group = url.query::().unwrap_or_default(); let group = group.value(); - let value = format!("{}:{}:{}:{}", category, interface, version, group); - + let value = format!("{}:{}:{}:{}", "providers", interface, version, group); Self { category, interface, From a3c62c4ebf1f5373e62268a2cbd0622020dda73b Mon Sep 17 00:00:00 2001 From: kwsc98 <958280102@qq.com> Date: Wed, 15 May 2024 16:41:27 +0800 Subject: [PATCH 3/4] update cfg target_os unix to linux --- dubbo/src/triple/transport/connector/mod.rs | 4 ++-- dubbo/src/triple/transport/listener/mod.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dubbo/src/triple/transport/connector/mod.rs b/dubbo/src/triple/transport/connector/mod.rs index 690b781a..2bd8b9a5 100644 --- a/dubbo/src/triple/transport/connector/mod.rs +++ b/dubbo/src/triple/transport/connector/mod.rs @@ -17,7 +17,7 @@ pub mod http_connector; pub mod https_connector; -#[cfg(any(target_os = "macos", target_os = "unix"))] +#[cfg(any(target_os = "macos", target_os = "linux"))] pub mod unix_connector; use hyper::Uri; @@ -84,7 +84,7 @@ pub fn get_connector(connector: &str) -> BoxCloneService { let c = unix_connector::UnixConnector::new(); BoxCloneService::new(Connector::new(c)) diff --git a/dubbo/src/triple/transport/listener/mod.rs b/dubbo/src/triple/transport/listener/mod.rs index 0be7415f..237bb181 100644 --- a/dubbo/src/triple/transport/listener/mod.rs +++ b/dubbo/src/triple/transport/listener/mod.rs @@ -16,7 +16,7 @@ */ pub mod tcp_listener; -#[cfg(any(target_os = "macos", target_os = "unix"))] +#[cfg(any(target_os = "macos", target_os = "linux"))] pub mod unix_listener; use std::net::SocketAddr; @@ -65,7 +65,7 @@ impl Listener for WrappedListener { pub async fn get_listener(name: String, addr: SocketAddr) -> Result { match name.as_str() { "tcp" => Ok(TcpListener::bind(addr).await?.boxed()), - #[cfg(any(target_os = "macos", target_os = "unix"))] + #[cfg(any(target_os = "macos", target_os = "linux"))] "unix" => Ok(unix_listener::UnixListener::bind(addr).await?.boxed()), _ => { warn!("no support listener: {:?}", name); From 40901556a08d2a22fc7a2961f869a8d08c51356c Mon Sep 17 00:00:00 2001 From: kwsc98 <958280102@qq.com> Date: Thu, 4 Jul 2024 16:59:34 +0800 Subject: [PATCH 4/4] update extension api --- examples/interface/src/client.rs | 9 +++++++-- examples/interface/src/server.rs | 6 ++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/examples/interface/src/client.rs b/examples/interface/src/client.rs index 8ec03eb2..ab89252f 100644 --- a/examples/interface/src/client.rs +++ b/examples/interface/src/client.rs @@ -15,14 +15,19 @@ * limitations under the License. */ -use dubbo::{codegen::ClientBuilder, extension}; +use dubbo::{ + codegen::ClientBuilder, + extension::{self, registry_extension::RegistryExtension}, +}; use example_interface::{DemoServiceClient, ReqDto}; use registry_nacos::NacosRegistry; #[tokio::main] async fn main() { dubbo::logger::init(); - let _ = extension::EXTENSIONS.register::().await; + let _ = extension::EXTENSIONS + .register::>() + .await; let builder = ClientBuilder::new().with_registry("nacos://127.0.0.1:8848".parse().unwrap()); let mut client = DemoServiceClient::new(builder); let res = client.sayHello("world1".to_string()).await; diff --git a/examples/interface/src/server.rs b/examples/interface/src/server.rs index 6b71fec4..5bfdf1d2 100644 --- a/examples/interface/src/server.rs +++ b/examples/interface/src/server.rs @@ -19,7 +19,7 @@ use std::env; use dubbo::{ config::RootConfig, - extension, + extension::{self, registry_extension::RegistryExtension}, logger::{tracing::span, Level}, Dubbo, }; @@ -57,7 +57,9 @@ async fn main() { Ok(config) => config, Err(_err) => panic!("err: {:?}", _err), // response was droped }; - let _ = extension::EXTENSIONS.register::().await; + let _ = extension::EXTENSIONS + .register::>() + .await; let server = DemoServiceImpl { _db: "i am db".to_owned(), };