Skip to content

Commit 5574beb

Browse files
committed
perf: remove filter specific measurement
1 parent 0b944af commit 5574beb

File tree

5 files changed

+54
-109
lines changed

5 files changed

+54
-109
lines changed

src/components/proxy/io_uring_shared.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ impl IoUringLoop {
455455
// Just double buffer the pending writes for simplicity
456456
let mut double_pending_sends = Vec::with_capacity(pending_sends.capacity());
457457

458-
let processing_metrics = metrics::ProcessingMetrics::new();
458+
let mut processing_metrics = metrics::ProcessingMetrics::new();
459459

460460
// When sending packets, this is the direction used when updating metrics
461461
let send_dir = if matches!(ctx, PacketProcessorCtx::Router { .. }) {
@@ -567,7 +567,8 @@ impl IoUringLoop {
567567
}
568568
Token::Send { key } => {
569569
let packet = loop_ctx.pop_packet(key).finalize_send();
570-
let asn_info = packet.asn_info.as_ref().into();
570+
let ip_metrics_entry = packet.asn_info;
571+
let asn_info = ip_metrics_entry.as_ref().into();
571572

572573
if ret < 0 {
573574
let source =
@@ -576,16 +577,25 @@ impl IoUringLoop {
576577
metrics::packets_dropped_total(send_dir, &source, &asn_info)
577578
.inc();
578579
} else if ret as usize != packet.data.len() {
579-
metrics::packets_total(send_dir, &asn_info).inc();
580580
metrics::errors_total(
581581
send_dir,
582582
"sent bytes != packet length",
583583
&asn_info,
584584
)
585585
.inc();
586+
*processing_metrics
587+
.packets_total
588+
.entry((send_dir, ip_metrics_entry))
589+
.or_default() += 1;
586590
} else {
587-
metrics::packets_total(send_dir, &asn_info).inc();
588-
metrics::bytes_total(send_dir, &asn_info).inc_by(ret as u64);
591+
*processing_metrics
592+
.packets_total
593+
.entry((send_dir, ip_metrics_entry.clone()))
594+
.or_default() += 1;
595+
*processing_metrics
596+
.bytes_total
597+
.entry((send_dir, ip_metrics_entry))
598+
.or_default() += ret as usize;
589599
}
590600
}
591601
}

src/components/proxy/packet_router.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -66,21 +66,26 @@ impl DownstreamReceiveWorkerConfig {
6666
"received packet from downstream"
6767
);
6868

69-
let timer = processing_time.start_timer();
70-
match Self::process_downstream_received_packet(packet, config, sessions, destinations) {
71-
Ok(()) => {
72-
error_acc.maybe_send();
73-
}
74-
Err(error) => {
75-
let discriminant = error.discriminant();
76-
metrics::errors_total(metrics::READ, discriminant, &metrics::EMPTY).inc();
77-
metrics::packets_dropped_total(metrics::READ, discriminant, &metrics::EMPTY).inc();
78-
79-
error_acc.push_error(error);
80-
}
81-
}
69+
processing_time.observe_closure_duration(
70+
|| match Self::process_downstream_received_packet(
71+
packet,
72+
config,
73+
sessions,
74+
destinations,
75+
) {
76+
Ok(()) => {
77+
error_acc.maybe_send();
78+
}
79+
Err(error) => {
80+
let discriminant = error.discriminant();
81+
metrics::errors_total(metrics::READ, discriminant, &metrics::EMPTY).inc();
82+
metrics::packets_dropped_total(metrics::READ, discriminant, &metrics::EMPTY)
83+
.inc();
8284

83-
timer.stop_and_record();
85+
error_acc.push_error(error);
86+
}
87+
},
88+
);
8489
}
8590

8691
/// Processes a packet by running it through the filter chain.

src/filters/chain.rs

Lines changed: 3 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -14,28 +14,11 @@
1414
* limitations under the License.
1515
*/
1616

17-
use prometheus::{exponential_buckets, Histogram};
18-
1917
use crate::{
2018
config::Filter as FilterConfig,
2119
filters::{prelude::*, FilterRegistry},
22-
metrics::{histogram_opts, CollectorExt},
2320
};
2421

25-
const FILTER_LABEL: &str = "filter";
26-
27-
/// Start the histogram bucket at an eighth of a millisecond, as we bucketed the full filter
28-
/// chain processing starting at a quarter of a millisecond, so we we will want finer granularity
29-
/// here.
30-
const BUCKET_START: f64 = 0.000125;
31-
32-
const BUCKET_FACTOR: f64 = 2.5;
33-
34-
/// At an exponential factor of 2.5 (BUCKET_FACTOR), 11 iterations gets us to just over half a
35-
/// second. Any processing that occurs over half a second is far too long, so we end
36-
/// the bucketing there as we don't care about granularity past this value.
37-
const BUCKET_COUNT: usize = 11;
38-
3922
/// A chain of [`Filter`]s to be executed in order.
4023
///
4124
/// Executes each filter, passing the [`ReadContext`] and [`WriteContext`]
@@ -45,50 +28,11 @@ const BUCKET_COUNT: usize = 11;
4528
#[derive(Clone, Default)]
4629
pub struct FilterChain {
4730
filters: Vec<(String, FilterInstance)>,
48-
filter_read_duration_seconds: Vec<Histogram>,
49-
filter_write_duration_seconds: Vec<Histogram>,
5031
}
5132

5233
impl FilterChain {
5334
pub fn new(filters: Vec<(String, FilterInstance)>) -> Result<Self, CreationError> {
54-
let subsystem = "filter";
55-
56-
Ok(Self {
57-
filter_read_duration_seconds: filters
58-
.iter()
59-
.map(|(name, _)| {
60-
Histogram::with_opts(
61-
histogram_opts(
62-
"read_duration_seconds",
63-
subsystem,
64-
"Seconds taken to execute a given filter's `read`.",
65-
Some(
66-
exponential_buckets(BUCKET_START, BUCKET_FACTOR, BUCKET_COUNT)
67-
.unwrap(),
68-
),
69-
)
70-
.const_label(FILTER_LABEL, name),
71-
)
72-
.and_then(|histogram| histogram.register_if_not_exists())
73-
})
74-
.collect::<Result<_, prometheus::Error>>()?,
75-
filter_write_duration_seconds: filters
76-
.iter()
77-
.map(|(name, _)| {
78-
Histogram::with_opts(
79-
histogram_opts(
80-
"write_duration_seconds",
81-
subsystem,
82-
"Seconds taken to execute a given filter's `write`.",
83-
Some(exponential_buckets(0.000125, 2.5, 11).unwrap()),
84-
)
85-
.const_label(FILTER_LABEL, name),
86-
)
87-
.and_then(|histogram| histogram.register_if_not_exists())
88-
})
89-
.collect::<Result<_, prometheus::Error>>()?,
90-
filters,
91-
})
35+
Ok(Self { filters })
9236
}
9337

9438
#[inline]
@@ -274,15 +218,9 @@ impl schemars::JsonSchema for FilterChain {
274218

275219
impl Filter for FilterChain {
276220
fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> {
277-
for ((id, instance), histogram) in self
278-
.filters
279-
.iter()
280-
.zip(self.filter_read_duration_seconds.iter())
281-
{
221+
for (id, instance) in self.filters.iter() {
282222
tracing::trace!(%id, "read filtering packet");
283-
let timer = histogram.start_timer();
284223
let result = instance.filter().read(ctx);
285-
timer.stop_and_record();
286224
match result {
287225
Ok(()) => tracing::trace!(%id, "read passing packet"),
288226
Err(error) => {
@@ -304,16 +242,9 @@ impl Filter for FilterChain {
304242
}
305243

306244
fn write(&self, ctx: &mut WriteContext) -> Result<(), FilterError> {
307-
for ((id, instance), histogram) in self
308-
.filters
309-
.iter()
310-
.rev()
311-
.zip(self.filter_write_duration_seconds.iter().rev())
312-
{
245+
for (id, instance) in self.filters.iter().rev() {
313246
tracing::trace!(%id, "write filtering packet");
314-
let timer = histogram.start_timer();
315247
let result = instance.filter().write(ctx);
316-
timer.stop_and_record();
317248
match result {
318249
Ok(()) => tracing::trace!(%id, "write passing packet"),
319250
Err(error) => {

src/metrics.rs

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
* limitations under the License.
1515
*/
1616

17+
use std::collections::HashMap;
18+
1719
use crate::net::maxmind_db::MetricsIpNetEntry;
1820
use once_cell::sync::Lazy;
1921
use prometheus::{
2022
core::Collector, local::LocalHistogram, Histogram, HistogramOpts, HistogramVec, IntCounter,
2123
IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, DEFAULT_BUCKETS,
2224
};
2325

24-
pub use prometheus::Result;
25-
2626
/// "event" is used as a label for Metrics that can apply to both Filter
2727
/// `read` and `write` executions.
2828
pub const DIRECTION_LABEL: &str = "event";
@@ -58,7 +58,7 @@ pub(crate) const BUCKET_FACTOR: f64 = 2.0;
5858
/// care about granularity past 1 second.
5959
pub(crate) const BUCKET_COUNT: usize = 13;
6060

61-
#[derive(Clone, Copy, Debug)]
61+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
6262
pub enum Direction {
6363
Read,
6464
Write,
@@ -269,34 +269,33 @@ pub fn register<T: Collector + Sized + Clone + 'static>(collector: T) -> T {
269269
.unwrap()
270270
}
271271

272-
pub trait CollectorExt: Collector + Clone + Sized + 'static {
273-
/// Registers the current metric collector with the provided registry
274-
/// if not already registered.
275-
fn register_if_not_exists(self) -> Result<Self> {
276-
match registry().register(Box::from(self.clone())) {
277-
Ok(_) | Err(prometheus::Error::AlreadyReg) => Ok(self),
278-
Err(err) => Err(err),
279-
}
280-
}
281-
}
282-
283-
impl<C: Collector + Clone + 'static> CollectorExt for C {}
284-
285272
/// A local instance of all of the metrics related to packet processing.
286273
pub struct ProcessingMetrics {
287274
pub read_processing_time: LocalHistogram,
275+
pub packets_total: HashMap<(Direction, Option<MetricsIpNetEntry>), usize>,
276+
pub bytes_total: HashMap<(Direction, Option<MetricsIpNetEntry>), usize>,
288277
}
289278

290279
impl ProcessingMetrics {
291280
pub fn new() -> Self {
292281
Self {
293282
read_processing_time: processing_time(READ).local(),
283+
packets_total: <_>::default(),
284+
bytes_total: <_>::default(),
294285
}
295286
}
296287

297288
#[inline]
298-
pub fn flush(&self) {
289+
pub fn flush(&mut self) {
299290
self.read_processing_time.flush();
291+
292+
for ((send_dir, asn_info), amount) in self.packets_total.drain() {
293+
packets_total(send_dir, &asn_info.as_ref().into()).inc_by(amount as _);
294+
}
295+
296+
for ((send_dir, asn_info), amount) in self.bytes_total.drain() {
297+
bytes_total(send_dir, &asn_info.as_ref().into()).inc_by(amount as _);
298+
}
300299
}
301300
}
302301

src/net/maxmind_db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ pub struct IpNetEntry {
171171
pub prefix: String,
172172
}
173173

174-
#[derive(Clone)]
174+
#[derive(Clone, PartialEq, Eq, Hash)]
175175
pub struct MetricsIpNetEntry {
176176
pub prefix: String,
177177
pub id: u64,

0 commit comments

Comments
 (0)