diff --git a/crates/derive/src/stages/batch/batch_stream.rs b/crates/derive/src/stages/batch/batch_stream.rs index abc561f08..0576ed9d5 100644 --- a/crates/derive/src/stages/batch/batch_stream.rs +++ b/crates/derive/src/stages/batch/batch_stream.rs @@ -167,7 +167,7 @@ where return Err(PipelineError::InvalidBatchValidity.crit()); } - return Err(PipelineError::Eof.temp()); + return Err(PipelineError::NotEnoughData.temp()); } BatchValidity::Undecided | BatchValidity::Future => { return Err(PipelineError::NotEnoughData.temp()) @@ -225,6 +225,7 @@ mod test { types::ResetSignal, }; use alloc::vec; + use alloy_eips::NumHash; use op_alloy_protocol::{SingleBatch, SpanBatchElement}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -382,4 +383,34 @@ mod test { assert_eq!(stream.span_buffer_size(), 0); assert!(stream.span.is_none()); } + + #[tokio::test] + async fn test_past_span_batch() { + let mock_batch = SpanBatch { + batches: vec![ + SpanBatchElement { epoch_num: 1, timestamp: 2, ..Default::default() }, + SpanBatchElement { epoch_num: 1, timestamp: 4, ..Default::default() }, + ], + ..Default::default() + }; + let mock_origins = [BlockInfo { number: 1, timestamp: 12, ..Default::default() }]; + let data = vec![Ok(Batch::Span(mock_batch))]; + + let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() }); + let prev = TestBatchStreamProvider::new(data); + let mut stream = BatchStream::new(prev, config.clone(), TestL2ChainProvider::default()); + + // The stage should be active. + assert!(stream.is_active().unwrap()); + + let parent = L2BlockInfo { + block_info: BlockInfo { number: 10, timestamp: 100, ..Default::default() }, + l1_origin: NumHash::default(), + seq_num: 0, + }; + + // `next_batch` should return an error if the span batch is in the past. + let err = stream.next_batch(parent, &mock_origins).await.unwrap_err(); + assert_eq!(err, PipelineError::NotEnoughData.temp()); + } }