|
| 1 | +use alloc::{collections::VecDeque, sync::Arc}; |
| 2 | +use hashbrown::HashMap; |
| 3 | + |
| 4 | +use alloy::primitives::B256; |
| 5 | +use async_trait::async_trait; |
| 6 | +use eyre::{eyre, Result}; |
| 7 | +use kona_derive::{ |
| 8 | + errors::BlobProviderError, |
| 9 | + online::{ |
| 10 | + OnlineBeaconClient, OnlineBlobProviderBuilder, OnlineBlobProviderWithFallback, |
| 11 | + SimpleSlotDerivation, |
| 12 | + }, |
| 13 | + traits::BlobProvider, |
| 14 | +}; |
| 15 | +use kona_primitives::{Blob, BlockInfo, IndexedBlobHash}; |
| 16 | +use parking_lot::Mutex; |
| 17 | +use reth::primitives::BlobTransactionSidecar; |
| 18 | +use tracing::warn; |
| 19 | +use url::Url; |
| 20 | + |
| 21 | +/// Layered [BlobProvider] for the Kona derivation pipeline. |
| 22 | +/// |
| 23 | +/// This provider wraps different blob sources in an ordered manner: |
| 24 | +/// - First, it attempts to fetch blobs from an in-memory store. |
| 25 | +/// - If the blobs are not found, it then attempts to fetch them from an online beacon client. |
| 26 | +/// - If the blobs are still not found, it tries to fetch them from a blob archiver (if set). |
| 27 | +/// - If all sources fail, the provider will return a [BlobProviderError]. |
| 28 | +#[derive(Debug, Clone)] |
| 29 | +pub struct LayeredBlobProvider { |
| 30 | + /// In-memory inner blob provider, used for locally caching blobs as |
| 31 | + /// they come during live sync (when following the chain tip). |
| 32 | + memory: Arc<Mutex<InnerBlobProvider>>, |
| 33 | + |
| 34 | + /// Fallback online blob provider. |
| 35 | + /// This is used primarily during sync when archived blobs |
| 36 | + /// aren't provided by reth since they'll be too old. |
| 37 | + /// |
| 38 | + /// The `WithFallback` setup allows to specify two different |
| 39 | + /// endpoints for a primary and a fallback blob provider. |
| 40 | + online: OnlineBlobProviderWithFallback< |
| 41 | + OnlineBeaconClient, |
| 42 | + OnlineBeaconClient, |
| 43 | + SimpleSlotDerivation, |
| 44 | + >, |
| 45 | +} |
| 46 | + |
| 47 | +/// A blob provider that hold blobs in memory. |
| 48 | +#[derive(Debug)] |
| 49 | +pub struct InnerBlobProvider { |
| 50 | + /// Maximum number of blobs to keep in memory. |
| 51 | + capacity: usize, |
| 52 | + /// Order of key insertion for oldest entry eviction. |
| 53 | + key_order: VecDeque<B256>, |
| 54 | + /// Maps block hashes to blob hashes to blob sidecars. |
| 55 | + blocks_to_blob_sidecars: HashMap<B256, Vec<BlobTransactionSidecar>>, |
| 56 | +} |
| 57 | + |
| 58 | +impl InnerBlobProvider { |
| 59 | + /// Creates a new [InnerBlobProvider]. |
| 60 | + pub fn with_capacity(cap: usize) -> Self { |
| 61 | + Self { |
| 62 | + capacity: cap, |
| 63 | + blocks_to_blob_sidecars: HashMap::with_capacity(cap), |
| 64 | + key_order: VecDeque::with_capacity(cap), |
| 65 | + } |
| 66 | + } |
| 67 | + |
| 68 | + /// Inserts multiple blob sidecars nto the provider. |
| 69 | + pub fn insert_blob_sidecars( |
| 70 | + &mut self, |
| 71 | + block_hash: B256, |
| 72 | + sidecars: Vec<BlobTransactionSidecar>, |
| 73 | + ) { |
| 74 | + if let Some(existing_blobs) = self.blocks_to_blob_sidecars.get_mut(&block_hash) { |
| 75 | + existing_blobs.extend(sidecars); |
| 76 | + } else { |
| 77 | + if self.blocks_to_blob_sidecars.len() >= self.capacity { |
| 78 | + if let Some(oldest) = self.key_order.pop_front() { |
| 79 | + self.blocks_to_blob_sidecars.remove(&oldest); |
| 80 | + } |
| 81 | + } |
| 82 | + self.blocks_to_blob_sidecars.insert(block_hash, sidecars); |
| 83 | + } |
| 84 | + } |
| 85 | +} |
| 86 | + |
| 87 | +impl LayeredBlobProvider { |
| 88 | + /// Creates a new [LayeredBlobProvider] with a local blob store, an online primary beacon |
| 89 | + /// client and an optional fallback blob archiver for fetching blobs. |
| 90 | + pub fn new(beacon_client_url: Url, blob_archiver_url: Option<Url>) -> Self { |
| 91 | + let memory = Arc::new(Mutex::new(InnerBlobProvider::with_capacity(512))); |
| 92 | + |
| 93 | + let online = OnlineBlobProviderBuilder::new() |
| 94 | + .with_primary(beacon_client_url.to_string()) |
| 95 | + .with_fallback(blob_archiver_url.map(|url| url.to_string())) |
| 96 | + .build(); |
| 97 | + |
| 98 | + Self { memory, online } |
| 99 | + } |
| 100 | + |
| 101 | + /// Inserts multiple blob sidecars into the in-memory provider. |
| 102 | + #[inline] |
| 103 | + pub fn insert_blob_sidecars( |
| 104 | + &mut self, |
| 105 | + block_hash: B256, |
| 106 | + sidecars: Vec<BlobTransactionSidecar>, |
| 107 | + ) { |
| 108 | + self.memory.lock().insert_blob_sidecars(block_hash, sidecars); |
| 109 | + } |
| 110 | + |
| 111 | + /// Attempts to fetch blobs using the in-memory blob store. |
| 112 | + #[inline] |
| 113 | + async fn memory_blob_load( |
| 114 | + &mut self, |
| 115 | + block_ref: &BlockInfo, |
| 116 | + blob_hashes: &[IndexedBlobHash], |
| 117 | + ) -> Result<Vec<Blob>> { |
| 118 | + let locked = self.memory.lock(); |
| 119 | + |
| 120 | + let sidecars_for_block = locked |
| 121 | + .blocks_to_blob_sidecars |
| 122 | + .get(&block_ref.hash) |
| 123 | + .ok_or(eyre!("No blob sidecars found for block ref: {:?}", block_ref))?; |
| 124 | + |
| 125 | + // for each sidecar, get the blob hashes and check if any of them are |
| 126 | + // part of the requested hashes. If they are, add the corresponding blobs |
| 127 | + // to the blobs vector. |
| 128 | + let mut blobs = Vec::with_capacity(blob_hashes.len()); |
| 129 | + let requested_hashes = blob_hashes.iter().map(|h| h.hash).collect::<Vec<_>>(); |
| 130 | + for sidecar in sidecars_for_block { |
| 131 | + for (hash, blob) in sidecar.versioned_hashes().zip(&sidecar.blobs) { |
| 132 | + if requested_hashes.contains(&hash) { |
| 133 | + blobs.push(*blob); |
| 134 | + } |
| 135 | + } |
| 136 | + } |
| 137 | + |
| 138 | + Ok(blobs) |
| 139 | + } |
| 140 | + |
| 141 | + /// Attempts to fetch blobs using the online blob provider. |
| 142 | + #[inline] |
| 143 | + async fn online_blob_load( |
| 144 | + &mut self, |
| 145 | + block_ref: &BlockInfo, |
| 146 | + blob_hashes: &[IndexedBlobHash], |
| 147 | + ) -> Result<Vec<Blob>, BlobProviderError> { |
| 148 | + self.online.get_blobs(block_ref, blob_hashes).await |
| 149 | + } |
| 150 | +} |
| 151 | + |
| 152 | +#[async_trait] |
| 153 | +impl BlobProvider for LayeredBlobProvider { |
| 154 | + /// Fetches blobs for a given block ref and the blob hashes. |
| 155 | + async fn get_blobs( |
| 156 | + &mut self, |
| 157 | + block_ref: &BlockInfo, |
| 158 | + blob_hashes: &[IndexedBlobHash], |
| 159 | + ) -> Result<Vec<Blob>, BlobProviderError> { |
| 160 | + if let Ok(b) = self.memory_blob_load(block_ref, blob_hashes).await { |
| 161 | + return Ok(b); |
| 162 | + } else { |
| 163 | + warn!("Blob provider falling back to online provider"); |
| 164 | + self.online_blob_load(block_ref, blob_hashes).await |
| 165 | + } |
| 166 | + } |
| 167 | +} |
0 commit comments