From 4fc43ca47cc1bb8a5b44f02d12c77b939bc4e580 Mon Sep 17 00:00:00 2001 From: kazk Date: Tue, 18 Jan 2022 16:36:24 -0800 Subject: [PATCH] Use `await_condition` and reduce `unwrap` in examples Signed-off-by: kazk --- examples/pod_portforward.rs | 25 +++++---------------- examples/pod_portforward_hyper_http.rs | 30 ++++++-------------------- 2 files changed, 12 insertions(+), 43 deletions(-) diff --git a/examples/pod_portforward.rs b/examples/pod_portforward.rs index 40ad144c3..a9bd367ea 100644 --- a/examples/pod_portforward.rs +++ b/examples/pod_portforward.rs @@ -1,10 +1,9 @@ -#[macro_use] extern crate log; - -use futures::{StreamExt, TryStreamExt}; +use futures::StreamExt; use k8s_openapi::api::core::v1::Pod; use kube::{ - api::{Api, DeleteParams, ListParams, PostParams, WatchEvent}, + api::{Api, DeleteParams, PostParams}, + runtime::wait::{await_condition, conditions::is_pod_running}, Client, ResourceExt, }; @@ -34,22 +33,8 @@ async fn main() -> anyhow::Result<()> { pods.create(&PostParams::default(), &p).await?; // Wait until the pod is running, otherwise we get 500 error. - let lp = ListParams::default().fields("metadata.name=example").timeout(10); - let mut stream = pods.watch(&lp, "0").await?.boxed(); - while let Some(status) = stream.try_next().await? { - match status { - WatchEvent::Added(o) => { - info!("Added {}", o.name()); - } - WatchEvent::Modified(o) => { - let s = o.status.as_ref().expect("status exists on pod"); - if s.phase.clone().unwrap_or_default() == "Running" { - break; - } - } - _ => {} - } - } + let running = await_condition(pods.clone(), "example", is_pod_running()); + let _ = tokio::time::timeout(std::time::Duration::from_secs(15), running).await?; let mut pf = pods.portforward("example", &[80]).await?; let mut ports = pf.ports().unwrap(); diff --git a/examples/pod_portforward_hyper_http.rs b/examples/pod_portforward_hyper_http.rs index 82a979a23..8cb89dbe0 100644 --- a/examples/pod_portforward_hyper_http.rs +++ b/examples/pod_portforward_hyper_http.rs @@ -1,10 +1,8 @@ -#[macro_use] extern crate log; - -use futures::{StreamExt, TryStreamExt}; use k8s_openapi::api::core::v1::Pod; use kube::{ - api::{Api, DeleteParams, ListParams, PostParams, WatchEvent}, + api::{Api, DeleteParams, PostParams}, + runtime::wait::{await_condition, conditions::is_pod_running}, Client, ResourceExt, }; @@ -34,22 +32,8 @@ async fn main() -> anyhow::Result<()> { pods.create(&PostParams::default(), &p).await?; // Wait until the pod is running, otherwise we get 500 error. - let lp = ListParams::default().fields("metadata.name=example").timeout(10); - let mut stream = pods.watch(&lp, "0").await?.boxed(); - while let Some(status) = stream.try_next().await? { - match status { - WatchEvent::Added(o) => { - info!("Added {}", o.name()); - } - WatchEvent::Modified(o) => { - let s = o.status.as_ref().expect("status exists on pod"); - if s.phase.clone().unwrap_or_default() == "Running" { - break; - } - } - _ => {} - } - } + let running = await_condition(pods.clone(), "example", is_pod_running()); + let _ = tokio::time::timeout(std::time::Duration::from_secs(15), running).await?; let mut pf = pods.portforward("example", &[80]).await?; let mut ports = pf.ports().unwrap(); @@ -72,11 +56,11 @@ async fn main() -> anyhow::Result<()> { .body(Body::from("")) .unwrap(); - let (parts, body) = sender.send_request(http_req).await.unwrap().into_parts(); + let (parts, body) = sender.send_request(http_req).await?.into_parts(); assert!(parts.status == 200); - let body_bytes = body::to_bytes(body).await.unwrap(); - let body_str = std::str::from_utf8(&body_bytes).unwrap(); + let body_bytes = body::to_bytes(body).await?; + let body_str = std::str::from_utf8(&body_bytes)?; assert!(body_str.contains("Welcome to nginx!")); // Delete it