-
-
Notifications
You must be signed in to change notification settings - Fork 838
one collector per agg request instead per bucket #2759
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
a16d0ff to
d97bda9
Compare
59e9f5a to
856e5d5
Compare
In this refactoring a collector knows in which bucket of the parent their data is in. This allows to convert the previous approach of one collector per bucket to one collector per request. low card bucket optimization
use paged term map in term agg use special no sub agg term map impl
3356fc8 to
d76b315
Compare
increase cache to 2048
1f717da to
29add85
Compare
29add85 to
411587a
Compare
411587a to
1591344
Compare
| /// Only used when LOWCARD is true. | ||
| /// Cache doc ids per bucket for sub-aggregations. | ||
| /// | ||
| /// The outer Vec is indexed by BucketId. | ||
| per_bucket_docs: Vec<Vec<DocId>>, | ||
| /// Only used when LOWCARD is false. | ||
| /// For higher cardinalities we use a partitioned approach to store | ||
| /// | ||
| /// partitioned Vec<(BucketId, DocId)> pairs to improve grouping locality. | ||
| partitions: [PartitionEntry; NUM_PARTITIONS], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!? why use a boolean for this. I don't understand?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What boolean? Do you mean array?
It's done as a cheap inexact group_by on bucket_id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no I meant LOWCARD.
It seems strange to have both partitions and per_bucket_docs, and switch usage basedon per_bucket_docs.
(Also why not Box<[Vec; NUM_PARTITIONS]>?)
8a4286c to
71dc084
Compare
| // Note: We need to make sure that we don't lock ourselves into a situation where we hit | ||
| // the FLUSH_THRESHOLD, but never flush any buckets. (except the final flush) | ||
| let mut bucket_treshold = FLUSH_THRESHOLD / (self.per_bucket_docs.len().max(1) * 2); | ||
| const _: () = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's to make sure this is checked at compile time? That's fun.
| // The threshold above which we flush buckets individually. | ||
| // Note: We need to make sure that we don't lock ourselves into a situation where we hit | ||
| // the FLUSH_THRESHOLD, but never flush any buckets. (except the final flush) | ||
| let mut bucket_treshold = FLUSH_THRESHOLD / (self.per_bucket_docs.len().max(1) * 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| let mut bucket_treshold = FLUSH_THRESHOLD / (self.per_bucket_docs.len().max(1) * 2); | |
| let mut bucket_threshold = FLUSH_THRESHOLD / (self.per_bucket_docs.len().max(1) * 2); |
| let mut bucket_treshold = FLUSH_THRESHOLD / (self.per_bucket_docs.len().max(1) * 2); | ||
| const _: () = { | ||
| // MAX_NUM_TERMS_FOR_VEC == LOWCARD threshold | ||
| let bucket_treshold = FLUSH_THRESHOLD / (MAX_NUM_TERMS_FOR_VEC as usize * 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| let bucket_treshold = FLUSH_THRESHOLD / (MAX_NUM_TERMS_FOR_VEC as usize * 2); | |
| let bucket_threshold = FLUSH_THRESHOLD / (MAX_NUM_TERMS_FOR_VEC as usize * 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you pick another name here?
| // MAX_NUM_TERMS_FOR_VEC == LOWCARD threshold | ||
| let bucket_treshold = FLUSH_THRESHOLD / (MAX_NUM_TERMS_FOR_VEC as usize * 2); | ||
| assert!( | ||
| bucket_treshold > 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| bucket_treshold > 0, | |
| bucket_threshold > 0, |
| for doc_id in docs { | ||
| let doc_id = *doc_id; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| for doc_id in docs { | |
| let doc_id = *doc_id; | |
| for &doc_id in docs { |
| // TODO: this is terrible, a new vec is allocated for every doc | ||
| // We can fetch blocks instead | ||
| // We don't need to store the order for every value | ||
| let sorts: Vec<DocValueAndOrder> = req |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just have a Vec buffer on the Self?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, currently the vec is stored in the TopNComputer
|
Do you know if the high card case is overall a regression or not? |
In this refactoring a collector knows in which bucket of the parent their data is in. This allows to convert the previous approach of one collector per bucket to one collector per request.
Add
PagedTermMapas anotherTermAggregationMapto reduce memory usage compared to aHashMapIt contains an optimization for low cardinality bucket id
Remove Clone on the collector (we only have one instance now)
Future Work
PerRequestAggSegCtx, we can store now everything in the collectorPerformance
The heavy hitters are drastically reduced in terms of memory and CPU.
For term aggs with many terms, we use a lot less memory.
We use some more buffers to pass docs, which increases memory consumption for some aggs.
Biggest regression is
terms_zipf_1000_with_avg_sub_agg Avg: 9.1190ms (+39.79%)Which should be fixed when we fetch all values for all buckets at once.