Skip to content

Commit f13a52e

Browse files
committed
Replace stream::iter implementation for fixing the Send issue in DQ.
1 parent 33284df commit f13a52e

File tree

1 file changed

+52
-44
lines changed

1 file changed

+52
-44
lines changed

mp2-v1/src/query/batching_planner.rs

+52-44
Original file line numberDiff line numberDiff line change
@@ -119,54 +119,62 @@ async fn generate_chunks<const CHUNK_SIZE: usize, C: ContextProvider>(
119119
.cloned()
120120
.collect::<BTreeSet<_>>();
121121

122-
Ok(stream::iter(sorted_index_values.into_iter())
123-
.then(async |index_value| {
124-
let index_path = index_cache
125-
.compute_path(&index_value, current_epoch)
122+
let prove_rows = async |index_value| {
123+
let index_path = index_cache
124+
.compute_path(&index_value, current_epoch)
125+
.await
126+
.unwrap_or_else(|| panic!("node with key {index_value} not found in index tree cache"));
127+
let proven_rows = if let Some(matching_rows) =
128+
row_keys_by_epochs.get(&(index_value as Epoch))
129+
{
130+
let sorted_rows = matching_rows.iter().collect::<BTreeSet<_>>();
131+
stream::iter(sorted_rows.iter())
132+
.then(async |&row_key| {
133+
compute_input_for_row(&row_cache, row_key, index_value, &index_path, column_ids)
134+
.await
135+
})
136+
.collect::<Vec<RowInput>>()
137+
.await
138+
} else {
139+
let proven_node = non_existence_inputs
140+
.find_row_node_for_non_existence(index_value)
126141
.await
127-
.unwrap_or_else(|| {
128-
panic!("node with key {index_value} not found in index tree cache")
142+
.unwrap_or_else(|_| {
143+
panic!("node for non-existence not found for index value {index_value}")
129144
});
130-
let proven_rows =
131-
if let Some(matching_rows) = row_keys_by_epochs.get(&(index_value as Epoch)) {
132-
let sorted_rows = matching_rows.iter().collect::<BTreeSet<_>>();
133-
stream::iter(sorted_rows.iter())
134-
.then(async |&row_key| {
135-
compute_input_for_row(
136-
&row_cache,
137-
row_key,
138-
index_value,
139-
&index_path,
140-
column_ids,
141-
)
142-
.await
143-
})
144-
.collect::<Vec<RowInput>>()
145-
.await
146-
} else {
147-
let proven_node = non_existence_inputs
148-
.find_row_node_for_non_existence(index_value)
149-
.await
150-
.unwrap_or_else(|_| {
151-
panic!("node for non-existence not found for index value {index_value}")
152-
});
153-
let row_input = compute_input_for_row(
154-
non_existence_inputs.row_tree,
155-
&proven_node,
156-
index_value,
157-
&index_path,
158-
column_ids,
159-
)
160-
.await;
161-
vec![row_input]
162-
};
163-
proven_rows
164-
})
165-
.concat()
166-
.await
145+
let row_input = compute_input_for_row(
146+
non_existence_inputs.row_tree,
147+
&proven_node,
148+
index_value,
149+
&index_path,
150+
column_ids,
151+
)
152+
.await;
153+
vec![row_input]
154+
};
155+
proven_rows
156+
};
157+
158+
// TODO: This implementation causes an error in DQ:
159+
// `implementation of `std::marker::Send` is not general enough`
160+
/*
161+
let chunks = stream::iter(sorted_index_values.into_iter())
162+
.then(prove_rows)
163+
.concat()
164+
.await
165+
*/
166+
let mut chunks = vec![];
167+
for index_value in sorted_index_values {
168+
let chunk = prove_rows(index_value).await;
169+
chunks.extend(chunk);
170+
}
171+
172+
let chunks = chunks
167173
.chunks(CHUNK_SIZE)
168174
.map(|chunk| chunk.to_vec())
169-
.collect_vec())
175+
.collect_vec();
176+
177+
Ok(chunks)
170178
}
171179

172180
/// Key for nodes of the `UTForChunks<NUM_CHUNKS>` employed to

0 commit comments

Comments
 (0)