Skip to content

Commit 14b5682

Browse files
committed
Decouple mem_limiter from client
Signed-off-by: Alessandro Passaro <[email protected]>
1 parent aad91bc commit 14b5682

File tree

10 files changed

+94
-82
lines changed

10 files changed

+94
-82
lines changed

mountpoint-s3-fs/examples/prefetch_benchmark.rs

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -124,15 +124,37 @@ fn parse_duration(arg: &str) -> Result<Duration, String> {
124124
.map_err(|e| format!("Invalid duration: {e}"))
125125
}
126126

127-
fn create_memory_limiter(args: &CliArgs, client: &S3CrtClient) -> Arc<MemoryLimiter<S3CrtClient>> {
128-
let max_memory_target = if let Some(target) = args.max_memory_target {
129-
target * 1024 * 1024
130-
} else {
131-
// Default to 95% of total system memory
132-
let sys = System::new_with_specifics(RefreshKind::everything());
133-
(sys.total_memory() as f64 * 0.95) as u64
134-
};
135-
Arc::new(MemoryLimiter::new(client.clone(), max_memory_target))
127+
impl CliArgs {
128+
fn memory_target(&self) -> u64 {
129+
if let Some(target) = self.max_memory_target {
130+
target * 1024 * 1024
131+
} else {
132+
// Default to 95% of total system memory
133+
let sys = System::new_with_specifics(RefreshKind::everything());
134+
(sys.total_memory() as f64 * 0.95) as u64
135+
}
136+
}
137+
138+
fn s3_client_config(&self) -> S3ClientConfig {
139+
let initial_read_window_size = 1024 * 1024 + 128 * 1024;
140+
let mut client_config = S3ClientConfig::new()
141+
.read_backpressure(true)
142+
.initial_read_window(initial_read_window_size)
143+
.endpoint_config(EndpointConfig::new(self.region.as_str()));
144+
if let Some(throughput_target_gbps) = self.maximum_throughput_gbps {
145+
client_config = client_config.throughput_target_gbps(throughput_target_gbps as f64);
146+
}
147+
if let Some(limit_gib) = self.crt_memory_limit_gib {
148+
client_config = client_config.memory_limit_in_bytes(limit_gib * 1024 * 1024 * 1024);
149+
}
150+
if let Some(part_size) = self.part_size {
151+
client_config = client_config.part_size(part_size as usize);
152+
}
153+
if let Some(nics) = &self.bind {
154+
client_config = client_config.network_interface_names(nics.to_vec());
155+
}
156+
client_config
157+
}
136158
}
137159

138160
fn main() -> anyhow::Result<()> {
@@ -142,8 +164,9 @@ fn main() -> anyhow::Result<()> {
142164
let args = CliArgs::parse();
143165

144166
let bucket = args.bucket.as_str();
145-
let client = make_s3_client_from_args(&args).context("failed to create S3 CRT client")?;
146-
let mem_limiter = create_memory_limiter(&args, &client);
167+
let client_config = args.s3_client_config();
168+
let client = S3CrtClient::new(client_config).context("failed to create S3 CRT client")?;
169+
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), args.memory_target()));
147170
let runtime = Runtime::new(client.event_loop_group());
148171

149172
// Verify if all objects exist and collect metadata
@@ -258,24 +281,3 @@ async fn wait_for_download(
258281
}
259282
Ok(total_bytes_read)
260283
}
261-
262-
fn make_s3_client_from_args(args: &CliArgs) -> anyhow::Result<S3CrtClient> {
263-
let initial_read_window_size = 1024 * 1024 + 128 * 1024;
264-
let mut client_config = S3ClientConfig::new()
265-
.read_backpressure(true)
266-
.initial_read_window(initial_read_window_size)
267-
.endpoint_config(EndpointConfig::new(args.region.as_str()));
268-
if let Some(throughput_target_gbps) = args.maximum_throughput_gbps {
269-
client_config = client_config.throughput_target_gbps(throughput_target_gbps as f64);
270-
}
271-
if let Some(limit_gib) = args.crt_memory_limit_gib {
272-
client_config = client_config.memory_limit_in_bytes(limit_gib * 1024 * 1024 * 1024);
273-
}
274-
if let Some(part_size) = args.part_size {
275-
client_config = client_config.part_size(part_size as usize);
276-
}
277-
if let Some(nics) = &args.bind {
278-
client_config = client_config.network_interface_names(nics.to_vec());
279-
}
280-
Ok(S3CrtClient::new(client_config)?)
281-
}

mountpoint-s3-fs/src/mem_limiter.rs

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ impl BufferArea {
5151
/// mem reserved by the backpressure controller
5252
/// (on `BackpressureFeedbackEvent`)
5353
///
54-
#[derive(Debug)]
55-
pub struct MemoryLimiter<Client: ObjectClient> {
54+
pub struct MemoryLimiter {
5655
mem_limit: u64,
5756
/// Reserved memory for allocations we are tracking, such as buffers we allocate for prefetching.
5857
/// The memory may not be used yet but has been reserved.
@@ -63,11 +62,24 @@ pub struct MemoryLimiter<Client: ObjectClient> {
6362
// prefetch takes control over the entire read path but we don't record or control
6463
// memory usage on the write path today, so we will rely on the client's stats
6564
// for "other buffers" and adjust the prefetcher read window accordingly.
66-
client: Client,
65+
get_client_mem: Box<dyn Fn() -> u64 + Send + Sync + 'static>,
66+
}
67+
68+
impl std::fmt::Debug for MemoryLimiter {
69+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70+
f.debug_struct("MemoryLimiter")
71+
.field("mem_limit", &self.mem_limit)
72+
.field("mem_reserved", &self.mem_reserved)
73+
.field("additional_mem_reserved", &self.additional_mem_reserved)
74+
.finish_non_exhaustive()
75+
}
6776
}
6877

69-
impl<Client: ObjectClient> MemoryLimiter<Client> {
70-
pub fn new(client: Client, mem_limit: u64) -> Self {
78+
impl MemoryLimiter {
79+
pub fn new<Client>(client: Client, mem_limit: u64) -> Self
80+
where
81+
Client: ObjectClient + Send + Sync + 'static,
82+
{
7183
let min_reserved = 128 * 1024 * 1024;
7284
let reserved_mem = (mem_limit / 8).max(min_reserved);
7385
let formatter = make_format(humansize::BINARY);
@@ -77,10 +89,14 @@ impl<Client: ObjectClient> MemoryLimiter<Client> {
7789
formatter(reserved_mem)
7890
);
7991
Self {
80-
client,
8192
mem_limit,
8293
mem_reserved: AtomicU64::new(0),
8394
additional_mem_reserved: reserved_mem,
95+
get_client_mem: Box::new(move || {
96+
client
97+
.mem_usage_stats()
98+
.map_or(0, |stats| stats.primary_allocated.saturating_add(stats.secondary_used))
99+
}),
84100
}
85101
}
86102

@@ -147,8 +163,6 @@ impl<Client: ObjectClient> MemoryLimiter<Client> {
147163
// where memory is allocated exactly equal to the used memory. So total allocated memory for the CRT client would
148164
// be `primary_allocated` + `secondary_used`.
149165
fn client_mem_allocated(&self) -> u64 {
150-
self.client
151-
.mem_usage_stats()
152-
.map_or(0, |stats| stats.primary_allocated.saturating_add(stats.secondary_used))
166+
(self.get_client_mem)()
153167
}
154168
}

mountpoint-s3-fs/src/prefetch/backpressure_controller.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::sync::Arc;
33

44
use async_channel::{Receiver, RecvError, Sender, unbounded};
55
use humansize::make_format;
6-
use mountpoint_s3_client::ObjectClient;
76
use tracing::trace;
87

98
use crate::mem_limiter::{BufferArea, MemoryLimiter};
@@ -35,7 +34,7 @@ pub struct BackpressureConfig {
3534
/// It is used to send feedback ([Self::send_feedback]) to its corresponding [BackpressureLimiter],
3635
/// the counterpart which should be leveraged by the stream producer.
3736
#[derive(Debug)]
38-
pub struct BackpressureController<Client: ObjectClient> {
37+
pub struct BackpressureController {
3938
/// Sender for the [BackpressureLimiter] to receive size increments from the controller.
4039
read_window_updater: Sender<usize>,
4140
/// Amount by which the producer should be producing data ahead of [Self::next_read_offset].
@@ -58,7 +57,7 @@ pub struct BackpressureController<Client: ObjectClient> {
5857
/// Memory limiter is used to guide decisions on how much data to prefetch.
5958
///
6059
/// For example, when memory is low we should scale down [Self::preferred_read_window_size].
61-
mem_limiter: Arc<MemoryLimiter<Client>>,
60+
mem_limiter: Arc<MemoryLimiter>,
6261
}
6362

6463
/// The [BackpressureLimiter] is used on producer side of a stream, for example,
@@ -81,10 +80,10 @@ pub struct BackpressureLimiter {
8180
///
8281
/// This pair allows a consumer to send feedback ([BackpressureFeedbackEvent]) when starved or bytes are consumed,
8382
/// informing a producer (a holder of the [BackpressureLimiter]) when it should provide data more aggressively.
84-
pub fn new_backpressure_controller<Client: ObjectClient>(
83+
pub fn new_backpressure_controller(
8584
config: BackpressureConfig,
86-
mem_limiter: Arc<MemoryLimiter<Client>>,
87-
) -> (BackpressureController<Client>, BackpressureLimiter) {
85+
mem_limiter: Arc<MemoryLimiter>,
86+
) -> (BackpressureController, BackpressureLimiter) {
8887
// Minimum window size multiplier as the scaling up and down won't work if the multiplier is 1.
8988
const MIN_WINDOW_SIZE_MULTIPLIER: usize = 2;
9089
let read_window_end_offset = config.request_range.start + config.initial_read_window_size as u64;
@@ -114,7 +113,7 @@ pub fn new_backpressure_controller<Client: ObjectClient>(
114113
(controller, limiter)
115114
}
116115

117-
impl<Client: ObjectClient> BackpressureController<Client> {
116+
impl BackpressureController {
118117
pub fn read_window_end_offset(&self) -> u64 {
119118
self.read_window_end_offset
120119
}
@@ -234,7 +233,7 @@ impl<Client: ObjectClient> BackpressureController<Client> {
234233
}
235234
}
236235

237-
impl<Client: ObjectClient> Drop for BackpressureController<Client> {
236+
impl Drop for BackpressureController {
238237
fn drop(&mut self) {
239238
debug_assert!(
240239
self.next_read_offset <= self.request_end_offset,
@@ -435,7 +434,7 @@ mod tests {
435434

436435
fn new_backpressure_controller_for_test(
437436
backpressure_config: BackpressureConfig,
438-
) -> (BackpressureController<MockClient>, BackpressureLimiter) {
437+
) -> (BackpressureController, BackpressureLimiter) {
439438
let client = MockClient::config()
440439
.bucket("test-bucket")
441440
.part_size(8 * 1024 * 1024)

mountpoint-s3-fs/src/prefetch/builder.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ where
3838
pub fn build(
3939
self,
4040
runtime: Runtime,
41-
mem_limiter: Arc<MemoryLimiter<Client>>,
41+
mem_limiter: Arc<MemoryLimiter>,
4242
prefetcher_config: PrefetcherConfig,
4343
) -> Prefetcher<Client> {
4444
self.inner.build(runtime, mem_limiter, prefetcher_config)
@@ -58,7 +58,7 @@ where
5858
fn build(
5959
self: Box<Self>,
6060
runtime: Runtime,
61-
mem_limiter: Arc<MemoryLimiter<Client>>,
61+
mem_limiter: Arc<MemoryLimiter>,
6262
prefetcher_config: PrefetcherConfig,
6363
) -> Prefetcher<Client>;
6464
}
@@ -74,7 +74,7 @@ where
7474
fn build(
7575
self: Box<Self>,
7676
runtime: Runtime,
77-
mem_limiter: Arc<MemoryLimiter<Client>>,
77+
mem_limiter: Arc<MemoryLimiter>,
7878
prefetcher_config: PrefetcherConfig,
7979
) -> Prefetcher<Client> {
8080
let part_stream = ClientPartStream::new(runtime, self.client, mem_limiter);
@@ -95,7 +95,7 @@ where
9595
fn build(
9696
self: Box<Self>,
9797
runtime: Runtime,
98-
mem_limiter: Arc<MemoryLimiter<Client>>,
98+
mem_limiter: Arc<MemoryLimiter>,
9999
prefetcher_config: PrefetcherConfig,
100100
) -> Prefetcher<Client> {
101101
let part_stream = CachingPartStream::new(runtime, self.client, mem_limiter, self.cache);

mountpoint-s3-fs/src/prefetch/caching_stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ pub struct CachingPartStream<Cache, Client: ObjectClient + Clone + Send + Sync +
2929
cache: Arc<Cache>,
3030
runtime: Runtime,
3131
client: Client,
32-
mem_limiter: Arc<MemoryLimiter<Client>>,
32+
mem_limiter: Arc<MemoryLimiter>,
3333
}
3434

3535
impl<Cache, Client: ObjectClient + Clone + Send + Sync + 'static> CachingPartStream<Cache, Client> {
36-
pub fn new(runtime: Runtime, client: Client, mem_limiter: Arc<MemoryLimiter<Client>>, cache: Cache) -> Self {
36+
pub fn new(runtime: Runtime, client: Client, mem_limiter: Arc<MemoryLimiter>, cache: Cache) -> Self {
3737
Self {
3838
cache: Arc::new(cache),
3939
runtime,

mountpoint-s3-fs/src/prefetch/part_queue.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub struct PartQueue<Client: ObjectClient> {
2323
failed: bool,
2424
/// The total number of bytes sent to the underlying queue of `self.receiver`
2525
bytes_received: Arc<AtomicUsize>,
26-
mem_limiter: Arc<MemoryLimiter<Client>>,
26+
mem_limiter: Arc<MemoryLimiter>,
2727
}
2828

2929
/// Producer side of the queue of [Part]s.
@@ -36,7 +36,7 @@ pub struct PartQueueProducer<E: std::error::Error> {
3636

3737
/// Creates an unbounded [PartQueue] and its related [PartQueueProducer].
3838
pub fn unbounded_part_queue<Client: ObjectClient>(
39-
mem_limiter: Arc<MemoryLimiter<Client>>,
39+
mem_limiter: Arc<MemoryLimiter>,
4040
) -> (PartQueue<Client>, PartQueueProducer<Client::ClientError>) {
4141
let (sender, receiver) = unbounded();
4242
let bytes_counter = Arc::new(AtomicUsize::new(0));

mountpoint-s3-fs/src/prefetch/part_stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,11 +195,11 @@ impl<Client> Debug for PartStream<Client> {
195195
pub struct ClientPartStream<Client: ObjectClient + Clone + Send + Sync + 'static> {
196196
runtime: Runtime,
197197
client: Client,
198-
mem_limiter: Arc<MemoryLimiter<Client>>,
198+
mem_limiter: Arc<MemoryLimiter>,
199199
}
200200

201201
impl<Client: ObjectClient + Clone + Send + Sync + 'static> ClientPartStream<Client> {
202-
pub fn new(runtime: Runtime, client: Client, mem_limiter: Arc<MemoryLimiter<Client>>) -> Self {
202+
pub fn new(runtime: Runtime, client: Client, mem_limiter: Arc<MemoryLimiter>) -> Self {
203203
Self {
204204
runtime,
205205
client,

mountpoint-s3-fs/src/prefetch/task.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ pub struct RequestTask<Client: ObjectClient> {
1717
remaining: usize,
1818
range: RequestRange,
1919
part_queue: PartQueue<Client>,
20-
backpressure_controller: BackpressureController<Client>,
20+
backpressure_controller: BackpressureController,
2121
}
2222

2323
impl<Client: ObjectClient> RequestTask<Client> {
2424
pub fn from_handle(
2525
task_handle: RemoteHandle<()>,
2626
range: RequestRange,
2727
part_queue: PartQueue<Client>,
28-
backpressure_controller: BackpressureController<Client>,
28+
backpressure_controller: BackpressureController,
2929
) -> Self {
3030
Self {
3131
_task_handle: task_handle,

mountpoint-s3-fs/src/upload.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub use incremental::AppendUploadRequest;
2828
pub struct Uploader<Client: ObjectClient> {
2929
client: Client,
3030
runtime: Runtime,
31-
mem_limiter: Arc<MemoryLimiter<Client>>,
31+
mem_limiter: Arc<MemoryLimiter>,
3232
storage_class: Option<String>,
3333
server_side_encryption: ServerSideEncryption,
3434
buffer_size: usize,
@@ -73,7 +73,7 @@ where
7373
pub fn new(
7474
client: Client,
7575
runtime: Runtime,
76-
mem_limiter: Arc<MemoryLimiter<Client>>,
76+
mem_limiter: Arc<MemoryLimiter>,
7777
storage_class: Option<String>,
7878
server_side_encryption: ServerSideEncryption,
7979
buffer_size: usize,

0 commit comments

Comments
 (0)