Skip to content

Commit fe351f6

Browse files
ref(sentry-core): Refactor logs batching into generic Batcher (#1018)
Refactor the logs batcher into a generic `Batcher<T>`, so that we can reuse it for metrics and any future types. Split off from #997 — however, in this PR, I aimed to minimize the diff between the [`logs.rs`](https://github.com/getsentry/sentry-rust/blob/06002299b38759a106f1a89a9578ed8c215f313d/sentry-core/src/logs.rs) and [`batch.rs`](https://github.com/getsentry/sentry-rust/blob/53e52e4c4cf67f7ffc9fee0eab1247336524c0c6/sentry-core/src/batcher.rs) files, so that `git` would recognize the change as a rename, not a file deletion and new file. Closes #1007 Closes [RUST-158](https://linear.app/getsentry/issue/RUST-158/refactor-logs-batching-into-a-reusable-generic-batchert) Co-authored-by: Joris Bayer <[email protected]>
1 parent 0600229 commit fe351f6

File tree

3 files changed

+62
-36
lines changed

3 files changed

+62
-36
lines changed
Lines changed: 55 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//! Batching for Sentry [structured logs](https://docs.sentry.io/product/explore/logs/).
1+
//! Generic batching for Sentry envelope items.
22
33
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
44
use std::thread::JoinHandle;
@@ -9,37 +9,61 @@ use crate::protocol::EnvelopeItem;
99
use crate::Envelope;
1010
use sentry_types::protocol::v7::Log;
1111

12-
// Flush when there's 100 logs in the buffer
13-
const MAX_LOG_ITEMS: usize = 100;
12+
// Flush when there's 100 items in the buffer
13+
const MAX_ITEMS: usize = 100;
1414
// Or when 5 seconds have passed from the last flush
1515
const FLUSH_INTERVAL: Duration = Duration::from_secs(5);
1616

17-
#[derive(Debug, Default)]
18-
struct LogQueue {
19-
logs: Vec<Log>,
17+
#[derive(Debug)]
18+
struct BatchQueue<T> {
19+
items: Vec<T>,
2020
}
2121

22-
/// Accumulates logs in the queue and submits them through the transport when one of the flushing
22+
pub(crate) trait IntoBatchEnvelopeItem: Sized {
23+
fn into_envelope_item(items: Vec<Self>) -> EnvelopeItem;
24+
}
25+
26+
impl<T> IntoBatchEnvelopeItem for T
27+
where
28+
Vec<T>: Into<EnvelopeItem>,
29+
{
30+
fn into_envelope_item(items: Vec<Self>) -> EnvelopeItem {
31+
items.into()
32+
}
33+
}
34+
35+
pub(crate) trait Batch: IntoBatchEnvelopeItem {
36+
const TYPE_NAME: &str;
37+
}
38+
39+
impl Batch for Log {
40+
const TYPE_NAME: &str = "logs";
41+
}
42+
43+
/// Accumulates items in the queue and submits them through the transport when one of the flushing
2344
/// conditions is met.
24-
pub(crate) struct LogsBatcher {
45+
pub(crate) struct Batcher<T: Batch> {
2546
transport: TransportArc,
26-
queue: Arc<Mutex<LogQueue>>,
47+
queue: Arc<Mutex<BatchQueue<T>>>,
2748
shutdown: Arc<(Mutex<bool>, Condvar)>,
2849
worker: Option<JoinHandle<()>>,
2950
}
3051

31-
impl LogsBatcher {
32-
/// Creates a new LogsBatcher that will submit envelopes to the given `transport`.
52+
impl<T> Batcher<T>
53+
where
54+
T: Batch + Send + 'static,
55+
{
56+
/// Creates a new Batcher that will submit envelopes to the given `transport`.
3357
pub(crate) fn new(transport: TransportArc) -> Self {
34-
let queue = Arc::new(Mutex::new(Default::default()));
58+
let queue = Arc::new(Mutex::new(BatchQueue { items: Vec::new() }));
3559
#[allow(clippy::mutex_atomic)]
3660
let shutdown = Arc::new((Mutex::new(false), Condvar::new()));
3761

3862
let worker_transport = transport.clone();
3963
let worker_queue = queue.clone();
4064
let worker_shutdown = shutdown.clone();
4165
let worker = std::thread::Builder::new()
42-
.name("sentry-logs-batcher".into())
66+
.name(format!("sentry-{}-batcher", T::TYPE_NAME))
4367
.spawn(move || {
4468
let (lock, cvar) = worker_shutdown.as_ref();
4569
let mut shutdown = lock.lock().unwrap();
@@ -57,7 +81,7 @@ impl LogsBatcher {
5781
return;
5882
}
5983
if last_flush.elapsed() >= FLUSH_INTERVAL {
60-
LogsBatcher::flush_queue_internal(
84+
Batcher::flush_queue_internal(
6185
worker_queue.lock().unwrap(),
6286
&worker_transport,
6387
);
@@ -74,48 +98,50 @@ impl LogsBatcher {
7498
worker: Some(worker),
7599
}
76100
}
101+
}
77102

78-
/// Enqueues a log for delayed sending.
103+
impl<T: Batch> Batcher<T> {
104+
/// Enqueues an item for delayed sending.
79105
///
80-
/// This will automatically flush the queue if it reaches a size of `BATCH_SIZE`.
81-
pub(crate) fn enqueue(&self, log: Log) {
106+
/// This will automatically flush the queue if it reaches a size of `MAX_ITEMS`.
107+
pub(crate) fn enqueue(&self, item: T) {
82108
let mut queue = self.queue.lock().unwrap();
83-
queue.logs.push(log);
84-
if queue.logs.len() >= MAX_LOG_ITEMS {
85-
LogsBatcher::flush_queue_internal(queue, &self.transport);
109+
queue.items.push(item);
110+
if queue.items.len() >= MAX_ITEMS {
111+
Batcher::flush_queue_internal(queue, &self.transport);
86112
}
87113
}
88114

89115
/// Flushes the queue to the transport.
90116
pub(crate) fn flush(&self) {
91117
let queue = self.queue.lock().unwrap();
92-
LogsBatcher::flush_queue_internal(queue, &self.transport);
118+
Batcher::flush_queue_internal(queue, &self.transport);
93119
}
94120

95121
/// Flushes the queue to the transport.
96122
///
97123
/// This is a static method as it will be called from both the background
98124
/// thread and the main thread on drop.
99-
fn flush_queue_internal(mut queue_lock: MutexGuard<LogQueue>, transport: &TransportArc) {
100-
let logs = std::mem::take(&mut queue_lock.logs);
125+
fn flush_queue_internal(mut queue_lock: MutexGuard<BatchQueue<T>>, transport: &TransportArc) {
126+
let items = std::mem::take(&mut queue_lock.items);
101127
drop(queue_lock);
102128

103-
if logs.is_empty() {
129+
if items.is_empty() {
104130
return;
105131
}
106132

107-
sentry_debug!("[LogsBatcher] Flushing {} logs", logs.len());
133+
sentry_debug!("[Batcher({})] Flushing {} items", T::TYPE_NAME, items.len());
108134

109135
if let Some(ref transport) = *transport.read().unwrap() {
110136
let mut envelope = Envelope::new();
111-
let logs_item: EnvelopeItem = logs.into();
112-
envelope.add_item(logs_item);
137+
let envelope_item = T::into_envelope_item(items);
138+
envelope.add_item(envelope_item);
113139
transport.send_envelope(envelope);
114140
}
115141
}
116142
}
117143

118-
impl Drop for LogsBatcher {
144+
impl<T: Batch> Drop for Batcher<T> {
119145
fn drop(&mut self) {
120146
let (lock, cvar) = self.shutdown.as_ref();
121147
*lock.lock().unwrap() = true;
@@ -124,7 +150,7 @@ impl Drop for LogsBatcher {
124150
if let Some(worker) = self.worker.take() {
125151
worker.join().ok();
126152
}
127-
LogsBatcher::flush_queue_internal(self.queue.lock().unwrap(), &self.transport);
153+
Batcher::flush_queue_internal(self.queue.lock().unwrap(), &self.transport);
128154
}
129155
}
130156

sentry-core/src/client.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ use crate::protocol::SessionUpdate;
1212
use rand::random;
1313
use sentry_types::random_uuid;
1414

15-
use crate::constants::SDK_INFO;
1615
#[cfg(feature = "logs")]
17-
use crate::logs::LogsBatcher;
16+
use crate::batcher::Batcher;
17+
use crate::constants::SDK_INFO;
1818
use crate::protocol::{ClientSdkInfo, Event};
1919
#[cfg(feature = "release-health")]
2020
use crate::session::SessionFlusher;
@@ -58,7 +58,7 @@ pub struct Client {
5858
#[cfg(feature = "release-health")]
5959
session_flusher: RwLock<Option<SessionFlusher>>,
6060
#[cfg(feature = "logs")]
61-
logs_batcher: RwLock<Option<LogsBatcher>>,
61+
logs_batcher: RwLock<Option<Batcher<Log>>>,
6262
#[cfg(feature = "logs")]
6363
default_log_attributes: Option<BTreeMap<String, LogAttribute>>,
6464
integrations: Vec<(TypeId, Arc<dyn Integration>)>,
@@ -86,7 +86,7 @@ impl Clone for Client {
8686

8787
#[cfg(feature = "logs")]
8888
let logs_batcher = RwLock::new(if self.options.enable_logs {
89-
Some(LogsBatcher::new(transport.clone()))
89+
Some(Batcher::new(transport.clone()))
9090
} else {
9191
None
9292
});
@@ -171,7 +171,7 @@ impl Client {
171171

172172
#[cfg(feature = "logs")]
173173
let logs_batcher = RwLock::new(if options.enable_logs {
174-
Some(LogsBatcher::new(transport.clone()))
174+
Some(Batcher::new(transport.clone()))
175175
} else {
176176
None
177177
});

sentry-core/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,12 @@ pub use crate::transport::{Transport, TransportFactory};
136136
mod logger; // structured logging macros exported with `#[macro_export]`
137137

138138
// client feature
139+
#[cfg(all(feature = "client", feature = "logs"))]
140+
mod batcher;
139141
#[cfg(feature = "client")]
140142
mod client;
141143
#[cfg(feature = "client")]
142144
mod hub_impl;
143-
#[cfg(all(feature = "client", feature = "logs"))]
144-
mod logs;
145145
#[cfg(feature = "client")]
146146
mod session;
147147

0 commit comments

Comments
 (0)