Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions google-cloud/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ tonic-build = "0.3.0"

[features]
default = []
full = ["pubsub", "datastore", "vision", "storage"]
full-derive = ["pubsub", "datastore-derive", "vision", "storage"]
full = ["pubsub", "datastore", "vision", "storage", "tasks"]
full-derive = ["pubsub", "datastore-derive", "vision", "storage", "tasks"]
pubsub = []
datastore = []
tasks = []
datastore-derive = ["datastore", "google-cloud-derive"]
vision = []
storage = ["reqwest"]
Expand Down
4 changes: 4 additions & 0 deletions google-cloud/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
["protos/google/cloud/vision/v1/image_annotator.proto"],
"src/vision/api",
),
(
["protos/google/cloud/tasks/v2beta3/cloudtasks.proto"],
"src/tasks/api",
),
];

for (proto_files, out_dir) in protos.iter() {
Expand Down
3 changes: 3 additions & 0 deletions google-cloud/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub mod pubsub;
/// Cloud Storage bindings.
#[cfg(feature = "storage")]
pub mod storage;
/// Cloud Tasks bindings.
#[cfg(feature = "tasks")]
pub mod tasks;
/// Cloud Vision bindings.
#[cfg(feature = "vision")]
pub mod vision;
Expand Down
617 changes: 617 additions & 0 deletions google-cloud/src/tasks/api/google.api.rs

Large diffs are not rendered by default.

1,677 changes: 1,677 additions & 0 deletions google-cloud/src/tasks/api/google.cloud.tasks.v2beta3.rs

Large diffs are not rendered by default.

408 changes: 408 additions & 0 deletions google-cloud/src/tasks/api/google.iam.v1.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions google-cloud/src/tasks/api/google.protobuf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

28 changes: 28 additions & 0 deletions google-cloud/src/tasks/api/google.r#type.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/// Represents an expression text. Example:
///
/// title: "User account presence"
/// description: "Determines whether the request has a user account"
/// expression: "size(request.user) > 0"
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Expr {
/// Textual representation of an expression in
/// Common Expression Language syntax.
///
/// The application context of the containing message determines which
/// well-known feature set of CEL is supported.
#[prost(string, tag = "1")]
pub expression: std::string::String,
/// An optional title for the expression, i.e. a short string describing
/// its purpose. This can be used e.g. in UIs which allow to enter the
/// expression.
#[prost(string, tag = "2")]
pub title: std::string::String,
/// An optional description of the expression. This is a longer text which
/// describes the expression, e.g. when hovered over it in a UI.
#[prost(string, tag = "3")]
pub description: std::string::String,
/// An optional string indicating the location of the expression for error
/// reporting, e.g. a file name and a position in the file.
#[prost(string, tag = "4")]
pub location: std::string::String,
}
70 changes: 70 additions & 0 deletions google-cloud/src/tasks/api/google.rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/// The `Status` type defines a logical error model that is suitable for
/// different programming environments, including REST APIs and RPC APIs. It is
/// used by [gRPC](https://github.com/grpc). The error model is designed to be:
///
/// - Simple to use and understand for most users
/// - Flexible enough to meet unexpected needs
///
/// # Overview
///
/// The `Status` message contains three pieces of data: error code, error
/// message, and error details. The error code should be an enum value of
/// [google.rpc.Code][google.rpc.Code], but it may accept additional error codes
/// if needed. The error message should be a developer-facing English message
/// that helps developers *understand* and *resolve* the error. If a localized
/// user-facing error message is needed, put the localized message in the error
/// details or localize it in the client. The optional error details may contain
/// arbitrary information about the error. There is a predefined set of error
/// detail types in the package `google.rpc` that can be used for common error
/// conditions.
///
/// # Language mapping
///
/// The `Status` message is the logical representation of the error model, but it
/// is not necessarily the actual wire format. When the `Status` message is
/// exposed in different client libraries and different wire protocols, it can be
/// mapped differently. For example, it will likely be mapped to some exceptions
/// in Java, but more likely mapped to some error codes in C.
///
/// # Other uses
///
/// The error model and the `Status` message can be used in a variety of
/// environments, either with or without APIs, to provide a
/// consistent developer experience across different environments.
///
/// Example uses of this error model include:
///
/// - Partial errors. If a service needs to return partial errors to the client,
/// it may embed the `Status` in the normal response to indicate the partial
/// errors.
///
/// - Workflow errors. A typical workflow has multiple steps. Each step may
/// have a `Status` message for error reporting.
///
/// - Batch operations. If a client uses batch request and batch response, the
/// `Status` message should be used directly inside batch response, one for
/// each error sub-response.
///
/// - Asynchronous operations. If an API call embeds asynchronous operation
/// results in its response, the status of those operations should be
/// represented directly using the `Status` message.
///
/// - Logging. If some API errors are stored in logs, the message `Status` could
/// be used directly after any stripping needed for security/privacy reasons.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Status {
/// The status code, which should be an enum value of
/// [google.rpc.Code][google.rpc.Code].
#[prost(int32, tag = "1")]
pub code: i32,
/// A developer-facing error message, which should be in English. Any
/// user-facing error message should be localized and sent in the
/// [google.rpc.Status.details][google.rpc.Status.details] field, or localized
/// by the client.
#[prost(string, tag = "2")]
pub message: std::string::String,
/// A list of messages that carry the error details. There is a common set of
/// message types for APIs to use.
#[prost(message, repeated, tag = "3")]
pub details: ::std::vec::Vec<::prost_types::Any>,
}
146 changes: 146 additions & 0 deletions google-cloud/src/tasks/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
use std::env;
use std::fs::File;
use std::sync::Arc;

use tokio::sync::Mutex;
use tonic::transport::{Certificate, Channel, ClientTlsConfig};
use tonic::{IntoRequest, Request};

use crate::authorize::{ApplicationCredentials, TokenManager, TLS_CERTS};
use crate::tasks::api;
use crate::tasks::api::cloud_tasks_client::CloudTasksClient;
use crate::tasks::{Error, Queue};

pub(crate) const ROUTING_METADATA_KEY: &str = "x-goog-request-params";

/// The Cloud Tasks client, tied to a specific project and location.
#[derive(Clone)]
pub struct Client {
pub(crate) project_name: String,
pub(crate) location_id: String,
pub(crate) service: CloudTasksClient<Channel>,
pub(crate) token_manager: Arc<Mutex<TokenManager>>,
}

impl Client {
pub(crate) const DOMAIN_NAME: &'static str = "cloudtasks.googleapis.com";
pub(crate) const ENDPOINT: &'static str = "https://cloudtasks.googleapis.com";
pub(crate) const SCOPES: [&'static str; 2] = [
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/cloud-tasks",
];

pub(crate) async fn construct_request<T: IntoRequest<T>>(
&mut self,
request: T,
) -> Result<Request<T>, Error> {
let mut request = request.into_request();
let token = self.token_manager.lock().await.token().await?;
let metadata = request.metadata_mut();
metadata.insert("authorization", token.parse().unwrap());
Ok(request)
}

/// Create a new client for the specified project.
///
/// Credentials are looked up in the `GOOGLE_APPLICATION_CREDENTIALS` environment variable.
pub async fn new(
project_name: impl Into<String>,
location_id: impl Into<String>,
) -> Result<Client, Error> {
let path = env::var("GOOGLE_APPLICATION_CREDENTIALS")?;
let file = File::open(path)?;
let creds = json::from_reader(file)?;

Client::from_credentials(project_name, location_id, creds).await
}

/// Create a new client for the specified project with custom credentials.
pub async fn from_credentials(
project_name: impl Into<String>,
location_id: impl Into<String>,
creds: ApplicationCredentials,
) -> Result<Client, Error> {
let tls_config = ClientTlsConfig::new()
.ca_certificate(Certificate::from_pem(TLS_CERTS))
.domain_name(Client::DOMAIN_NAME);

let channel = Channel::from_static(Client::ENDPOINT)
.tls_config(tls_config)?
.connect()
.await?;

Ok(Client {
project_name: project_name.into(),
location_id: location_id.into(),
service: CloudTasksClient::new(channel),
token_manager: Arc::new(Mutex::new(TokenManager::new(
creds,
Client::SCOPES.as_ref(),
))),
})
}

/// List queues
/// `filter` argument allows returning only a subset of queues, sample filter: "state: PAUSED"
pub async fn queues(&mut self, filter: &str) -> Result<Vec<Queue>, Error> {
let mut queues = Vec::new();
let page_size = 25;
let mut page_token = String::default();

loop {
let parent = format!(
"projects/{0}/locations/{1}",
self.project_name.as_str(),
self.location_id.as_str()
);
let request = api::ListQueuesRequest {
parent: parent.clone(),
filter: filter.to_string(),
page_size,
page_token,
};
let mut request = self.construct_request(request).await?;
// Add routing metadata
request.metadata_mut().insert(
ROUTING_METADATA_KEY,
format!("parent={}", parent).parse().unwrap(),
);
let response = self.service.list_queues(request).await?;
let response = response.into_inner();
page_token = response.next_page_token;
queues.extend(
response
.queues
.into_iter()
.map(|queue| Queue::new(self.clone(), queue.name)),
);
if page_token.is_empty() {
break;
}
}

Ok(queues)
}

/// Get a queue by name.
pub async fn queue(&mut self, id: &str) -> Result<Queue, Error> {
let name = format!(
"projects/{0}/locations/{1}/queues/{2}",
self.project_name.as_str(),
self.location_id.as_str(),
id,
);
let request = api::GetQueueRequest { name: name.clone() };
let mut request = self.construct_request(request).await?;
// Add routing metadata
request.metadata_mut().insert(
ROUTING_METADATA_KEY,
format!("name={}", name).parse().unwrap(),
);
let response = self.service.get_queue(request).await?;
let queue = response.into_inner();

Ok(Queue::new(self.clone(), queue.name))
}
}
45 changes: 45 additions & 0 deletions google-cloud/src/tasks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
mod client;
mod queue;
mod task;
mod task_authorization;
mod task_request_types;
mod utils;
mod api {
pub mod google {
pub mod api {
include!("api/google.api.rs");
}
pub mod cloud {
pub mod tasks {
pub mod v2beta3 {
include!("api/google.cloud.tasks.v2beta3.rs");
}
}
}
pub mod iam {
pub mod v1 {
include!("api/google.iam.v1.rs");
}
}
pub mod protobuf {
include!("api/google.protobuf.rs");
}
pub mod r#type {
include!("api/google.r#type.rs");
}
pub mod rpc {
include!("api/google.rpc.rs");
}
}
pub use self::google::cloud::tasks::v2beta3::*;
}

pub use self::client::*;
pub use self::queue::*;
pub use self::task::*;
pub use self::task_authorization::*;
pub use self::task_request_types::*;
pub(crate) use self::utils::*;

/// The error type for the Tasks module.
pub type Error = crate::error::Error;
62 changes: 62 additions & 0 deletions google-cloud/src/tasks/queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use crate::tasks::{api, TaskConfig, View, ROUTING_METADATA_KEY};
use crate::tasks::{Client, Error, Task};

/// Represents a Queue
#[derive(Clone)]
pub struct Queue {
pub(crate) client: Client,
pub(crate) name: String,
}

impl Queue {
pub(crate) fn new(client: Client, name: impl Into<String>) -> Queue {
Queue {
client,
name: name.into(),
}
}

/// Returns the unique identifier within its project
pub fn id(&self) -> &str {
self.name.rsplit('/').next().unwrap()
}

/// Create a new task in this queue
/// Requires the following roles on service account:
/// - roles/cloudtasks.viewer
/// - roles/cloudtasks.enqueuer
pub async fn create_task(&mut self, config: TaskConfig) -> Result<Task, Error> {
let request = api::CreateTaskRequest {
parent: self.name.clone(),
task: Some(config.into()),
response_view: 0,
};
let mut request = self.client.construct_request(request).await?;
request.metadata_mut().insert(
ROUTING_METADATA_KEY,
format!("parent={}", self.name.clone()).parse().unwrap(),
);
let response = self.client.service.create_task(request).await?;
let task = response.into_inner();
Ok((self.client.clone(), task).into())
}

/// Get task from this queue by ID (name)
/// Only the `id` part of the task name should be supplied
pub async fn get_task(&mut self, task_id: &str, view: Option<View>) -> Result<Task, Error> {
let name = format!("{}/tasks/{}", self.name.clone(), task_id);
let view: api::task::View = view.unwrap_or_default().into();
let request = api::GetTaskRequest {
name: name.clone(),
response_view: view as i32,
};
let mut request = self.client.construct_request(request).await?;
request.metadata_mut().insert(
ROUTING_METADATA_KEY,
format!("name={}", name).parse().unwrap(),
);
let response = self.client.service.get_task(request).await?;
let task = response.into_inner();
Ok((self.client.clone(), task).into())
}
}
Loading