diff --git a/src/connector/src/source/kinesis/source/reader.rs b/src/connector/src/source/kinesis/source/reader.rs index 51b3c77710410..94670a5f6665f 100644 --- a/src/connector/src/source/kinesis/source/reader.rs +++ b/src/connector/src/source/kinesis/source/reader.rs @@ -16,7 +16,7 @@ use std::time::Duration; use anyhow::{anyhow, Result}; use async_trait::async_trait; -use aws_sdk_kinesis::error::{DisplayErrorContext, SdkError}; +use aws_sdk_kinesis::error::{DisplayErrorContext, ProvideErrorMetadata, SdkError}; use aws_sdk_kinesis::operation::get_records::{GetRecordsError, GetRecordsOutput}; use aws_sdk_kinesis::primitives::DateTime; use aws_sdk_kinesis::types::ShardIteratorType; @@ -188,6 +188,16 @@ impl CommonSplitReader for KinesisSplitReader { self.new_shard_iter().await?; continue; } + Err(e) if e.code() == Some("InternalFailure") => { + tracing::warn!( + "stream {:?} shard {:?} met internal failure, retrying", + self.stream_name, + self.shard_id + ); + self.new_shard_iter().await?; + tokio::time::sleep(Duration::from_millis(200)).await; + continue; + } Err(e) => { let error_msg = format!( "Kinesis got a unhandled error: {:?}, stream {:?}, shard {:?}",