Skip to content

Commit

Permalink
feat(spooler): Add CachingEnvelopeStack (#4242)
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Nov 13, 2024
1 parent 3678120 commit 4552934
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
- Add additional fields to the `Event` `Getter`. ([#4238](https://github.com/getsentry/relay/pull/4238))
- Replace u64 with `OrganizationId` new-type struct for organization id. ([#4159](https://github.com/getsentry/relay/pull/4159))
- Add computed contexts for `os`, `browser` and `runtime`. ([#4239](https://github.com/getsentry/relay/pull/4239))
- Add `CachingEnvelopeStack` strategy to the buffer. ([#4242](https://github.com/getsentry/relay/pull/4242))

## 24.10.0

Expand Down
102 changes: 102 additions & 0 deletions relay-server/src/services/buffer/envelope_stack/caching.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use chrono::{DateTime, Utc};

use super::EnvelopeStack;
use crate::envelope::Envelope;

/// An envelope stack implementation that caches one element in memory and delegates
/// to another envelope stack for additional storage.
#[derive(Debug)]
pub struct CachingEnvelopeStack<S> {
/// The underlying envelope stack
inner: S,
/// The cached envelope (if any)
cached: Option<Box<Envelope>>,
}

impl<S> CachingEnvelopeStack<S>
where
S: EnvelopeStack,
{
/// Creates a new [`CachingEnvelopeStack`] wrapping the provided envelope stack
pub fn new(inner: S) -> Self {
Self {
inner,
cached: None,
}
}
}

impl<S> EnvelopeStack for CachingEnvelopeStack<S>
where
S: EnvelopeStack,
{
type Error = S::Error;

async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), Self::Error> {
if let Some(cached) = self.cached.take() {
self.inner.push(cached).await?;
}
self.cached = Some(envelope);

Ok(())
}

async fn peek(&mut self) -> Result<Option<DateTime<Utc>>, Self::Error> {
if let Some(ref envelope) = self.cached {
Ok(Some(envelope.received_at()))
} else {
self.inner.peek().await
}
}

async fn pop(&mut self) -> Result<Option<Box<Envelope>>, Self::Error> {
if let Some(envelope) = self.cached.take() {
Ok(Some(envelope))
} else {
self.inner.pop().await
}
}

async fn flush(mut self) {
if let Some(envelope) = self.cached {
if self.inner.push(envelope).await.is_err() {
relay_log::error!(
"error while pushing the cached envelope in the inner stack during flushing",
);
}
}
self.inner.flush().await;
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack;
use crate::services::buffer::testutils::utils::mock_envelope;

#[tokio::test]
async fn test_caching_stack() {
let inner = MemoryEnvelopeStack::new();
let mut stack = CachingEnvelopeStack::new(inner);

// Create test envelopes with different timestamps
let envelope_1 = mock_envelope(Utc::now());
let envelope_2 = mock_envelope(Utc::now());

// Push 2 envelopes
stack.push(envelope_1).await.unwrap();
stack.push(envelope_2).await.unwrap();

// We pop the cached element.
assert!(stack.pop().await.unwrap().is_some());

// We peek the stack expecting it peeks the inner one.
assert!(stack.peek().await.unwrap().is_some());

// We pop the element and then check if the stack is empty.
assert!(stack.pop().await.unwrap().is_some());
assert!(stack.peek().await.unwrap().is_none());
assert!(stack.pop().await.unwrap().is_none());
}
}
2 changes: 1 addition & 1 deletion relay-server/src/services/buffer/envelope_stack/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl EnvelopeStack for MemoryEnvelopeStack {
}

async fn peek(&mut self) -> Result<Option<DateTime<Utc>>, Self::Error> {
Ok(self.0.last().map(|e| e.meta().received_at()))
Ok(self.0.last().map(|e| e.received_at()))
}

async fn pop(&mut self) -> Result<Option<Box<Envelope>>, Self::Error> {
Expand Down
3 changes: 2 additions & 1 deletion relay-server/src/services/buffer/envelope_stack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ use chrono::{DateTime, Utc};

use crate::envelope::Envelope;

pub mod caching;
pub mod memory;
pub mod sqlite;

/// A stack-like data structure that holds [`Envelope`]s.
pub trait EnvelopeStack: Send + std::fmt::Debug {
/// The error type that is returned when an error is encountered during reading or writing the
/// [`EnvelopeStack`].
type Error: std::fmt::Debug;
type Error: std::fmt::Debug + std::error::Error;

/// Pushes an [`Envelope`] on top of the stack.
fn push(&mut self, envelope: Box<Envelope>) -> impl Future<Output = Result<(), Self::Error>>;
Expand Down
9 changes: 6 additions & 3 deletions relay-server/src/services/buffer/stack_provider/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::error::Error;
use relay_config::Config;

use crate::services::buffer::common::ProjectKeyPair;
use crate::services::buffer::envelope_stack::caching::CachingEnvelopeStack;
use crate::services::buffer::envelope_store::sqlite::{
SqliteEnvelopeStore, SqliteEnvelopeStoreError,
};
Expand Down Expand Up @@ -38,7 +39,7 @@ impl SqliteStackProvider {
}

impl StackProvider for SqliteStackProvider {
type Stack = SqliteEnvelopeStack;
type Stack = CachingEnvelopeStack<SqliteEnvelopeStack>;

async fn initialize(&self) -> InitializationState {
match self.envelope_store.project_key_pairs().await {
Expand All @@ -58,7 +59,7 @@ impl StackProvider for SqliteStackProvider {
stack_creation_type: StackCreationType,
project_key_pair: ProjectKeyPair,
) -> Self::Stack {
SqliteEnvelopeStack::new(
let inner = SqliteEnvelopeStack::new(
self.envelope_store.clone(),
self.batch_size_bytes,
project_key_pair.own_key,
Expand All @@ -69,7 +70,9 @@ impl StackProvider for SqliteStackProvider {
// it was empty, or we never had data on disk for that stack, so we assume by default
// that there is no need to check disk until some data is spooled.
Self::assume_data_on_disk(stack_creation_type),
)
);

CachingEnvelopeStack::new(inner)
}

fn has_store_capacity(&self) -> bool {
Expand Down

0 comments on commit 4552934

Please sign in to comment.