Skip to content

Commit 99d080d

Browse files
committed
memory
1 parent df02b5d commit 99d080d

File tree

11 files changed

+245
-271
lines changed

11 files changed

+245
-271
lines changed

src/query/pipeline/transforms/src/processors/memory_settings.rs

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,10 @@ impl Debug for MemorySettings {
7575
}
7676

7777
pub struct MemorySettingsBuilder {
78-
enable_global_level_spill: bool,
7978
max_memory_usage: Option<usize>,
8079

8180
enable_group_spill: bool,
8281

83-
enable_query_level_spill: bool,
8482
max_query_memory_usage: Option<usize>,
8583
query_memory_tracking: Option<Arc<MemStat>>,
8684

@@ -89,7 +87,6 @@ pub struct MemorySettingsBuilder {
8987

9088
impl MemorySettingsBuilder {
9189
pub fn with_max_memory_usage(mut self, max: usize) -> Self {
92-
self.enable_global_level_spill = true;
9390
self.max_memory_usage = Some(max);
9491
self
9592
}
@@ -99,7 +96,6 @@ impl MemorySettingsBuilder {
9996
max: usize,
10097
tracking: Option<Arc<MemStat>>,
10198
) -> Self {
102-
self.enable_query_level_spill = true;
10399
self.max_query_memory_usage = Some(max);
104100
self.query_memory_tracking = tracking;
105101
self
@@ -118,12 +114,15 @@ impl MemorySettingsBuilder {
118114
pub fn build(self) -> MemorySettings {
119115
MemorySettings {
120116
enable_group_spill: self.enable_group_spill,
117+
118+
enable_global_level_spill: self.max_memory_usage.is_some(),
121119
max_memory_usage: self.max_memory_usage.unwrap_or(usize::MAX),
122-
enable_global_level_spill: self.enable_global_level_spill,
123120
global_memory_tracking: &GLOBAL_MEM_STAT,
124-
enable_query_level_spill: self.enable_query_level_spill,
121+
122+
enable_query_level_spill: self.max_query_memory_usage.is_some(),
125123
max_query_memory_usage: self.max_query_memory_usage.unwrap_or(usize::MAX),
126124
query_memory_tracking: self.query_memory_tracking,
125+
127126
spill_unit_size: self.spill_unit_size.unwrap_or(0),
128127
}
129128
}
@@ -132,12 +131,10 @@ impl MemorySettingsBuilder {
132131
impl MemorySettings {
133132
pub fn builder() -> MemorySettingsBuilder {
134133
MemorySettingsBuilder {
135-
enable_global_level_spill: false,
136134
max_memory_usage: None,
137135

138136
enable_group_spill: true,
139137

140-
enable_query_level_spill: false,
141138
max_query_memory_usage: None,
142139
query_memory_tracking: None,
143140

@@ -146,29 +143,25 @@ impl MemorySettings {
146143
}
147144

148145
pub fn check_spill(&self) -> bool {
149-
if self.enable_global_level_spill
150-
&& self.global_memory_tracking.get_memory_usage() >= self.max_memory_usage
146+
if let Some(remain) = self.check_global()
147+
&& remain <= 0
151148
{
152149
return true;
153150
}
154151

155-
if self.enable_group_spill
156-
&& let Some(workload_group) = ThreadTracker::workload_group()
152+
if let Some(remain) = self.check_workload_group()
153+
&& remain <= 0
157154
{
158-
let workload_group_memory_usage = workload_group.mem_stat.get_memory_usage();
159-
let max_memory_usage = workload_group.max_memory_usage.load(Ordering::Relaxed);
160-
161-
if max_memory_usage != 0 && workload_group_memory_usage >= max_memory_usage {
162-
return true;
163-
}
155+
return true;
164156
}
165157

166-
let Some(query_memory_tracking) = self.query_memory_tracking.as_ref() else {
167-
return false;
168-
};
169-
170-
self.enable_query_level_spill
171-
&& query_memory_tracking.get_memory_usage() >= self.max_query_memory_usage
158+
if let Some(remain) = self.check_query()
159+
&& remain <= 0
160+
{
161+
true
162+
} else {
163+
false
164+
}
172165
}
173166

174167
fn check_global(&self) -> Option<isize> {
@@ -207,8 +200,7 @@ impl MemorySettings {
207200
return None;
208201
}
209202

210-
let query_memory_tracking = self.query_memory_tracking.as_ref()?;
211-
let usage = query_memory_tracking.get_memory_usage();
203+
let usage = self.query_memory_tracking.as_ref()?.get_memory_usage();
212204

213205
Some(if usage >= self.max_query_memory_usage {
214206
-((usage - self.max_query_memory_usage) as isize)

src/query/pipeline/transforms/src/processors/traits/spill.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use databend_common_exception::Result;
1616
use databend_common_expression::DataBlock;
1717
use databend_storages_common_cache::TempPath;
1818

19+
use crate::MemorySettings;
20+
1921
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
2022
pub enum Location {
2123
Remote(String),
@@ -32,6 +34,14 @@ impl Location {
3234
}
3335
}
3436

37+
#[async_trait::async_trait]
38+
pub trait SortSpiller: Clone + Send + Sync + 'static {
39+
async fn spill(&self, data_block: DataBlock) -> Result<Location>;
40+
async fn restore(&self, location: &Location) -> Result<DataBlock>;
41+
fn remove_local_file(&self, local: &TempPath);
42+
fn memory_settings(&self) -> &MemorySettings;
43+
}
44+
3545
#[async_trait::async_trait]
3646
pub trait DataBlockSpill: Clone + Send + Sync + 'static {
3747
async fn spill(&self, data_block: DataBlock) -> Result<Location> {

src/query/pipeline/transforms/src/processors/transforms/sorts/sort_collect.rs

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,17 @@ use super::TransformSortMergeLimit;
3535
use super::core::RowConverter;
3636
use super::core::Rows;
3737
use super::core::algorithm::SortAlgorithm;
38-
use crate::MemorySettings;
39-
use crate::traits::DataBlockSpill;
38+
use crate::traits::SortSpiller;
4039

4140
#[allow(clippy::large_enum_variant)]
42-
enum Inner<A: SortAlgorithm, S: DataBlockSpill> {
41+
enum Inner<A: SortAlgorithm, S: SortSpiller> {
4342
Collect(Vec<DataBlock>),
4443
Limit(TransformSortMergeLimit<A::Rows>),
4544
Spill(Vec<DataBlock>, SortSpill<A, S>),
4645
None,
4746
}
4847

49-
pub struct TransformSortCollect<A: SortAlgorithm, C, S: DataBlockSpill> {
48+
pub struct TransformSortCollect<A: SortAlgorithm, C, S: SortSpiller> {
5049
name: &'static str,
5150
input: Arc<InputPort>,
5251
output: Arc<OutputPort>,
@@ -68,14 +67,13 @@ pub struct TransformSortCollect<A: SortAlgorithm, C, S: DataBlockSpill> {
6867
aborting: AtomicBool,
6968

7069
enable_restore_prefetch: bool,
71-
memory_settings: MemorySettings,
7270
}
7371

7472
impl<A, C, S> TransformSortCollect<A, C, S>
7573
where
7674
A: SortAlgorithm,
7775
C: RowConverter<A::Rows>,
78-
S: DataBlockSpill,
76+
S: SortSpiller,
7977
{
8078
pub fn new(
8179
input: Arc<InputPort>,
@@ -87,7 +85,6 @@ where
8785
sort_limit: bool,
8886
order_col_generated: bool,
8987
enable_restore_prefetch: bool,
90-
memory_settings: MemorySettings,
9188
) -> Result<Self> {
9289
let row_converter = C::create(&sort_desc, base.schema.clone())?;
9390
let (name, inner) = match base.limit {
@@ -108,7 +105,6 @@ where
108105
base,
109106
inner,
110107
aborting: AtomicBool::new(false),
111-
memory_settings,
112108
max_block_size,
113109
default_num_merge,
114110
enable_restore_prefetch,
@@ -184,7 +180,7 @@ where
184180
SortSpillParams::determine(
185181
bytes,
186182
rows,
187-
ByteSize(self.memory_settings.spill_unit_size as _),
183+
ByteSize(self.base.spiller.memory_settings().spill_unit_size as _),
188184
self.max_block_size,
189185
self.enable_restore_prefetch,
190186
)
@@ -217,22 +213,23 @@ where
217213
}
218214

219215
fn check_spill(&self) -> bool {
216+
let memory_settings = self.base.spiller.memory_settings();
220217
match &self.inner {
221218
Inner::Limit(limit_sort) => {
222-
self.memory_settings.check_spill()
219+
memory_settings.check_spill()
223220
&& limit_sort.num_bytes()
224-
>= ByteSize(self.memory_settings.spill_unit_size as _) * 2_u64
221+
>= ByteSize(memory_settings.spill_unit_size as _) * 2_u64
225222
}
226223
Inner::Collect(input_data) => {
227-
self.memory_settings.check_spill()
224+
memory_settings.check_spill()
228225
&& input_data.iter().map(|b| b.memory_size()).sum::<usize>()
229-
>= self.memory_settings.spill_unit_size * 2
226+
>= memory_settings.spill_unit_size * 2
230227
}
231228
Inner::Spill(input_data, sort_spill) => {
232-
let rows = input_data.in_memory_rows();
233-
let params = sort_spill.params();
234-
self.memory_settings.check_spill() && rows >= params.batch_rows * 2
235-
|| input_data.in_memory_rows() >= params.max_rows()
229+
input_data.in_memory_rows() >= sort_spill.params().batch_rows * 2 && {
230+
let remain = memory_settings.check_spill_remain().unwrap();
231+
remain < memory_settings.spill_unit_size as isize * 2
232+
}
236233
}
237234
_ => unreachable!(),
238235
}
@@ -257,7 +254,7 @@ where
257254
A: SortAlgorithm + 'static,
258255
A::Rows: 'static,
259256
C: RowConverter<A::Rows> + Send + 'static,
260-
S: DataBlockSpill,
257+
S: SortSpiller,
261258
{
262259
fn name(&self) -> String {
263260
self.name.to_string()

src/query/pipeline/transforms/src/processors/transforms/sorts/sort_local_merge.rs

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ use super::core::RowConverter;
4141
use super::core::Rows;
4242
use super::core::algorithm::SortAlgorithm;
4343
use super::create_memory_merger;
44-
use crate::MemorySettings;
45-
use crate::traits::DataBlockSpill;
44+
use crate::traits::SortSpiller;
4645

4746
#[derive(Debug)]
4847
enum State {
@@ -55,14 +54,14 @@ enum State {
5554
}
5655

5756
#[allow(clippy::large_enum_variant)]
58-
enum Inner<A: SortAlgorithm, S: DataBlockSpill> {
57+
enum Inner<A: SortAlgorithm, S: SortSpiller> {
5958
Collect(Vec<DataBlock>),
6059
Limit(TransformSortMergeLimit<A::Rows>),
6160
Memory(MemoryMerger<A>),
6261
Spill(Vec<DataBlock>, SortSpill<A, S>),
6362
}
6463

65-
pub struct TransformSort<A: SortAlgorithm, C, S: DataBlockSpill> {
64+
pub struct TransformSort<A: SortAlgorithm, C, S: SortSpiller> {
6665
name: &'static str,
6766
input: Arc<InputPort>,
6867
output: Arc<OutputPort>,
@@ -87,14 +86,13 @@ pub struct TransformSort<A: SortAlgorithm, C, S: DataBlockSpill> {
8786

8887
max_block_size: usize,
8988
enable_restore_prefetch: bool,
90-
memory_settings: MemorySettings,
9189
}
9290

9391
impl<A, C, S> TransformSort<A, C, S>
9492
where
9593
A: SortAlgorithm,
9694
C: RowConverter<A::Rows>,
97-
S: DataBlockSpill,
95+
S: SortSpiller,
9896
{
9997
#[allow(clippy::too_many_arguments)]
10098
pub fn new(
@@ -108,7 +106,6 @@ where
108106
output_order_col: bool,
109107
order_col_generated: bool,
110108
enable_restore_prefetch: bool,
111-
memory_settings: MemorySettings,
112109
) -> Result<Self> {
113110
assert!(max_block_size > 0);
114111
let sort_row_offset = schema.fields().len() - 1;
@@ -142,7 +139,6 @@ where
142139
max_block_size,
143140
aborting: AtomicBool::new(false),
144141
enable_restore_prefetch,
145-
memory_settings,
146142
})
147143
}
148144

@@ -198,7 +194,7 @@ where
198194
SortSpillParams::determine(
199195
bytes,
200196
rows,
201-
ByteSize(self.memory_settings.spill_unit_size as _),
197+
ByteSize(self.base.spiller.memory_settings().spill_unit_size as _),
202198
self.max_block_size,
203199
self.enable_restore_prefetch,
204200
)
@@ -276,32 +272,25 @@ where
276272
self.output.push_data(Ok(block));
277273
}
278274

279-
fn input_rows(&self) -> (usize, usize) {
280-
match &self.inner {
281-
Inner::Collect(input_data) | Inner::Spill(input_data, _) => {
282-
(input_data.len(), input_data.in_memory_rows())
283-
}
284-
_ => (0, 0),
285-
}
286-
}
287-
288275
fn check_spill(&self) -> bool {
276+
let memory_settings = self.base.spiller.memory_settings();
289277
match &self.inner {
290278
Inner::Limit(limit_sort) => {
291-
self.memory_settings.check_spill()
279+
memory_settings.check_spill()
292280
&& limit_sort.num_bytes()
293-
>= ByteSize(self.memory_settings.spill_unit_size as _) * 2_u64
281+
>= ByteSize(memory_settings.spill_unit_size as _) * 2_u64
294282
}
295283
Inner::Collect(input_data) => {
296-
self.memory_settings.check_spill()
284+
memory_settings.check_spill()
297285
&& input_data.iter().map(|b| b.memory_size()).sum::<usize>()
298-
>= self.memory_settings.spill_unit_size * 2
286+
>= memory_settings.spill_unit_size * 2
299287
}
300288
Inner::Spill(input_data, sort_spill) => {
301-
let rows = input_data.in_memory_rows();
302289
let params = sort_spill.params();
303-
self.memory_settings.check_spill() && rows >= params.batch_rows * 2
304-
|| input_data.in_memory_rows() >= params.max_rows()
290+
input_data.in_memory_rows() >= params.batch_rows * 2 && {
291+
let remain = memory_settings.check_spill_remain().unwrap();
292+
remain < memory_settings.spill_unit_size as isize * 2
293+
}
305294
}
306295
_ => unreachable!(),
307296
}
@@ -314,7 +303,7 @@ where
314303
A: SortAlgorithm + 'static,
315304
A::Rows: 'static,
316305
C: RowConverter<A::Rows> + Send + 'static,
317-
S: DataBlockSpill,
306+
S: SortSpiller,
318307
{
319308
fn name(&self) -> String {
320309
self.name.to_string()
@@ -428,12 +417,12 @@ where
428417
let finished = self.input.is_finished();
429418
self.trans_to_spill()?;
430419

431-
let (incoming_block, incoming) = self.input_rows();
432420
let Inner::Spill(input_data, spill_sort) = &mut self.inner else {
433421
unreachable!()
434422
};
435-
423+
let incoming = input_data.in_memory_rows();
436424
if incoming > 0 {
425+
let incoming_block = input_data.len();
437426
let total_rows = spill_sort.collect_total_rows();
438427
log::debug!(incoming_block, incoming_rows = incoming, total_rows, finished; "sort_input_data");
439428
spill_sort
@@ -450,8 +439,9 @@ where
450439
unreachable!()
451440
};
452441
assert!(input_data.is_empty());
453-
let OutputData { block, finish, .. } =
454-
spill_sort.on_restore(&self.memory_settings).await?;
442+
let OutputData { block, finish, .. } = spill_sort
443+
.on_restore(self.base.spiller.memory_settings())
444+
.await?;
455445
self.output_data.extend(block);
456446
if finish {
457447
self.state = State::Finish

0 commit comments

Comments
 (0)