Skip to content

interceptor hooks not invoked on timeout #4318

@aajtodd

Description

@aajtodd

There are several interceptor hooks are always supposed to be invoked even when there is an error or timeout, e.g. read_after_attempt is documented as

/// **When:** This will **ALWAYS** be called once per attempt, as long as
/// `before_attempt` has been executed.

When timeout occurs the future is just dropped though and the hooks are never ran.

See

let maybe_timeout = async {
debug!("beginning attempt #{i}");
try_attempt(ctx, cfg, runtime_components, stop_point)
.instrument(debug_span!("try_attempt", "attempt" = i))
.await;
finally_attempt(ctx, cfg, runtime_components)
.instrument(debug_span!("finally_attempt", "attempt" = i))
.await;
Result::<_, SdkError<Error, HttpResponse>>::Ok(())
}
.maybe_timeout(attempt_timeout_config)
.await
.map_err(|err| OrchestratorError::timeout(err.into_source().unwrap()));
// We continue when encountering a timeout error. The retry classifier will decide what to do with it.
continue_on_err!([ctx] => maybe_timeout);


Unit test:

diff --git a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs
index a0a2b4d17..f637b450e 100644
--- a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs
+++ b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs
@@ -42,7 +42,7 @@ where
         .with_runtime_components((components_fn)(RuntimeComponentsBuilder::new(name)))
 }
 
-fn layer<LayerFn>(name: &'static str, layer_fn: LayerFn) -> FrozenLayer
+pub(crate) fn layer<LayerFn>(name: &'static str, layer_fn: LayerFn) -> FrozenLayer
 where
     LayerFn: FnOnce(&mut Layer),
 {
diff --git a/rust-runtime/aws-smithy-runtime/src/client/orchestrator.rs b/rust-runtime/aws-smithy-runtime/src/client/orchestrator.rs
index f0beea90d..d4b0cd9df 100644
--- a/rust-runtime/aws-smithy-runtime/src/client/orchestrator.rs
+++ b/rust-runtime/aws-smithy-runtime/src/client/orchestrator.rs
@@ -486,6 +486,7 @@ async fn finally_op(
 #[cfg(all(test, any(feature = "test-util", feature = "legacy-test-util")))]
 mod tests {
     use crate::client::auth::no_auth::{NoAuthRuntimePlugin, NO_AUTH_SCHEME_ID};
+    use crate::client::defaults::{default_sleep_impl_plugin, default_time_source_plugin, layer};
     use crate::client::orchestrator::endpoints::StaticUriEndpointResolver;
     use crate::client::orchestrator::{invoke, invoke_with_stop_point, StopPoint};
     use crate::client::retries::strategy::NeverRetryStrategy;
@@ -517,7 +518,9 @@ mod tests {
     use aws_smithy_runtime_api::client::runtime_components::{
         RuntimeComponents, RuntimeComponentsBuilder,
     };
-    use aws_smithy_runtime_api::client::runtime_plugin::{RuntimePlugin, RuntimePlugins};
+    use aws_smithy_runtime_api::client::runtime_plugin::{
+        RuntimePlugin, RuntimePlugins, StaticRuntimePlugin,
+    };
     use aws_smithy_runtime_api::client::ser_de::{
         SharedRequestSerializer, SharedResponseDeserializer,
     };
@@ -529,6 +532,7 @@ mod tests {
     use std::borrow::Cow;
     use std::sync::atomic::{AtomicBool, Ordering};
     use std::sync::Arc;
+    use std::time::Duration;
     use tracing_test::traced_test;
 
     fn new_request_serializer() -> CannedRequestSerializer {
@@ -1366,4 +1370,107 @@ mod tests {
             .read_after_execution_called
             .load(Ordering::Relaxed));
     }
+
+    #[tokio::test]
+    async fn test_read_after_attempt_with_timeout() {
+        #[derive(Debug, Default)]
+        struct Inner {
+            read_after_attempt_called: AtomicBool,
+            read_before_transmit_called: AtomicBool,
+        }
+        #[derive(Clone, Debug, Default)]
+        struct TestInterceptor {
+            inner: Arc<Inner>,
+        }
+
+        impl Intercept for TestInterceptor {
+            fn name(&self) -> &'static str {
+                "TestInterceptor"
+            }
+
+            fn read_before_transmit(
+                &self,
+                context: &BeforeTransmitInterceptorContextRef<'_>,
+                runtime_components: &RuntimeComponents,
+                cfg: &mut ConfigBag,
+            ) -> Result<(), BoxError> {
+                self.inner
+                    .read_before_transmit_called
+                    .store(true, Ordering::Relaxed);
+                Ok(())
+            }
+
+            fn read_after_attempt(
+                &self,
+                context: &FinalizerInterceptorContextRef<'_>,
+                runtime_components: &RuntimeComponents,
+                cfg: &mut ConfigBag,
+            ) -> Result<(), BoxError> {
+                self.inner
+                    .read_after_attempt_called
+                    .store(true, Ordering::Relaxed);
+                Ok(())
+            }
+        }
+
+        #[derive(Debug)]
+        struct TestInterceptorRuntimePlugin {
+            builder: RuntimeComponentsBuilder,
+        }
+
+        impl RuntimePlugin for TestInterceptorRuntimePlugin {
+            fn runtime_components(
+                &self,
+                _: &RuntimeComponentsBuilder,
+            ) -> Cow<'_, RuntimeComponentsBuilder> {
+                Cow::Borrowed(&self.builder)
+            }
+        }
+
+        let interceptor = TestInterceptor::default();
+        let client = NeverClient::new();
+
+        let runtime_plugins = || {
+            let timeout_plugin =
+                StaticRuntimePlugin::new().with_config(layer("test_timeout_config", |layer| {
+                    let timeout_cfg = TimeoutConfig::builder()
+                        .operation_attempt_timeout(Duration::from_millis(200))
+                        .build();
+                    layer.store_put(timeout_cfg);
+                }));
+
+            RuntimePlugins::new()
+                .with_client_plugin(TestOperationRuntimePlugin::new())
+                .with_operation_plugin(default_sleep_impl_plugin().unwrap())
+                .with_operation_plugin(default_time_source_plugin().unwrap())
+                .with_operation_plugin(NoAuthRuntimePlugin::new())
+                .with_operation_plugin(timeout_plugin)
+                .with_operation_plugin(TestInterceptorRuntimePlugin {
+                    builder: RuntimeComponentsBuilder::new("test")
+                        .with_interceptor(SharedInterceptor::new(interceptor.clone()))
+                        .with_http_client(Some(client.clone())),
+                })
+        };
+
+        let _err = invoke_with_stop_point(
+            "test",
+            "test",
+            Input::doesnt_matter(),
+            &runtime_plugins(),
+            StopPoint::None,
+        )
+        .await
+        .expect_err("an error was returned");
+        assert_eq!(client.num_calls(), 1);
+
+        assert!(interceptor
+            .inner
+            .read_before_transmit_called
+            .load(Ordering::Relaxed));
+
+        assert!(interceptor
+            .inner
+            .read_after_attempt_called
+            .load(Ordering::Relaxed));
+    }
 }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions