Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IOTDB-6328] Add optimization for aggregation query in align by device with template situation #12513

Merged
merged 35 commits into from
May 28, 2024
Merged
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
2b799e7
add temp impl
Beyyes May 9, 2024
304f68f
add select max_time(s1), last_value(s1), last_value(s2) impl
Beyyes May 11, 2024
a63cb4e
fix normal query error
Beyyes May 11, 2024
b732c75
add distributed serialize
Beyyes May 11, 2024
8699ede
fix sourcenode
Beyyes May 13, 2024
e7985fa
fix template serizlize error; fix filter can not push down bug
Beyyes May 13, 2024
1592507
process ProjectNode
liuminghui233 May 13, 2024
8ba6959
fix UT
liuminghui233 May 13, 2024
d1924f3
add more UT
liuminghui233 May 13, 2024
41f0b02
fix IT
liuminghui233 May 14, 2024
884dcca
not support agg(*) or agg(s1+1) now; perfect serialize method of RawD…
Beyyes May 14, 2024
613a344
fix extend aligned path
liuminghui233 May 14, 2024
6a69442
merge master
Beyyes May 14, 2024
303df4b
fix ut
Beyyes May 14, 2024
a70fc78
Merge branch 'lmh/fixPushDownProject' into beyyes/agg_template_alignb…
Beyyes May 14, 2024
c8199d2
fix descending aggregator problem; add more comments
Beyyes May 14, 2024
c1b9f3a
fix group by time; fix select agg1() having agg2()
Beyyes May 15, 2024
bd76f67
fix typeProvider.getTemplatedInfo().groupByTimeParameter problem
Beyyes May 15, 2024
135c808
merge with master
Beyyes May 15, 2024
7d8f8f8
add IoTDBAlignByDeviceWithTemplateAggregationIT
Beyyes May 15, 2024
8854da1
support sliding window, fix visitSlidingWindow method of OperatorGene…
Beyyes May 16, 2024
569d6ed
add ascending and descending aggregation descriptors support
Beyyes May 24, 2024
3d42a2a
merge master
Beyyes May 25, 2024
ef77dc5
fix merge error in TemplatedAnalyze
Beyyes May 26, 2024
d16c17e
Merge branch 'master' into beyyes/agg_template_alignbydevice
Beyyes May 26, 2024
2a1b868
fix outputEndTime serialize in templateInfo. remove redundant judgeme…
Beyyes May 27, 2024
126f856
add count_time support, add more its
Beyyes May 27, 2024
d86728d
fix select agg where s1>1 bug which is aggreagtion pushdown situation
Beyyes May 27, 2024
d596aa7
perfect logic of deviceToMeasurementIndexes, remove analyzeDeviceInpu…
Beyyes May 28, 2024
29caf67
fix deviceToMeasurementIdx error in SingleDeviceViewNode
Beyyes May 28, 2024
8f0fbe3
Merge branch 'master' into beyyes/agg_template_alignbydevice
Beyyes May 28, 2024
30b041e
remove useless code
Beyyes May 28, 2024
d5dbbaf
add more having case
Beyyes May 28, 2024
c5cad39
merge master, fix conflict
Beyyes May 28, 2024
b6acca4
fix smell
Beyyes May 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add distributed serialize
Beyyes committed May 11, 2024
commit b732c751c68ad715d8d40ddeb62429893bed0c43
Original file line number Diff line number Diff line change
@@ -104,6 +104,7 @@ public TemplatedInfo(
Map<String, IMeasurementSchema> schemaMap,
Map<String, List<InputLocation>> layoutMap,
Expression pushDownPredicate,
List<AggregationDescriptor> aggregationDescriptorList,
GroupByTimeParameter groupByTimeParameter,
boolean outputEndTime) {
this.measurementList = measurementList;
@@ -123,6 +124,7 @@ public TemplatedInfo(
}
this.pushDownPredicate = pushDownPredicate;

this.aggregationDescriptorList = aggregationDescriptorList;
this.groupByTimeParameter = groupByTimeParameter;
this.outputEndTime = outputEndTime;
}
@@ -259,6 +261,13 @@ public void serialize(ByteBuffer byteBuffer) {
} else {
ReadWriteIOUtils.write((byte) 0, byteBuffer);
}

if (aggregationDescriptorList != null) {
ReadWriteIOUtils.write(aggregationDescriptorList.size(), byteBuffer);
aggregationDescriptorList.forEach(d -> d.serialize(byteBuffer));
} else {
ReadWriteIOUtils.write((byte) 0, byteBuffer);
}
}

public void serialize(DataOutputStream stream) throws IOException {
@@ -302,6 +311,15 @@ public void serialize(DataOutputStream stream) throws IOException {
} else {
ReadWriteIOUtils.write((byte) 0, stream);
}

if (aggregationDescriptorList != null) {
ReadWriteIOUtils.write(aggregationDescriptorList.size(), stream);
for (AggregationDescriptor descriptor : aggregationDescriptorList) {
descriptor.serialize(stream);
}
} else {
ReadWriteIOUtils.write((byte) 0, stream);
}
}

public static TemplatedInfo deserialize(ByteBuffer byteBuffer) {
@@ -367,6 +385,15 @@ public static TemplatedInfo deserialize(ByteBuffer byteBuffer) {
pushDownPredicate = Expression.deserialize(byteBuffer);
}

List<AggregationDescriptor> aggregationDescriptorList = null;
listSize = ReadWriteIOUtils.readInt(byteBuffer);
if (listSize > 0) {
aggregationDescriptorList = new ArrayList<>(listSize);
while (listSize-- > 0) {
aggregationDescriptorList.add(AggregationDescriptor.deserialize(byteBuffer));
}
}

// TODO add groupByTimeParameter, outputEndTime serialization and deserialization

return new TemplatedInfo(
@@ -384,6 +411,7 @@ public static TemplatedInfo deserialize(ByteBuffer byteBuffer) {
currentSchemaMap,
layoutMap,
pushDownPredicate,
aggregationDescriptorList,
null,
false);
}
Original file line number Diff line number Diff line change
@@ -155,6 +155,7 @@ private void initAggQueryCommonVariables() {
analysis.getDeviceTemplate().getSchemaMap(),
filterLayoutMap,
null,
null,
analysis.getGroupByTimeParameter(),
queryStatement.isOutputEndTime()));
}
@@ -212,6 +213,7 @@ private void initNonAggQueryCommonVariables() {
analysis.getDeviceTemplate().getSchemaMap(),
filterLayoutMap,
null,
null,
analysis.getGroupByTimeParameter(),
queryStatement.isOutputEndTime()));
}
Original file line number Diff line number Diff line change
@@ -462,6 +462,8 @@ public static PlanNode deserializeWithTemplate(ByteBuffer buffer, TypeProvider t
return FilterNode.deserializeUseTemplate(buffer, typeProvider);
case 33:
return AlignedSeriesScanNode.deserializeUseTemplate(buffer, typeProvider);
case 34:
return AlignedSeriesAggregationScanNode.deserializeUseTemplate(buffer, typeProvider);
case 65:
return SingleDeviceViewNode.deserializeUseTemplate(buffer, typeProvider);
case 32:
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -255,6 +256,41 @@ public static AlignedSeriesAggregationScanNode deserialize(ByteBuffer byteBuffer
null);
}

@Override
public void serializeUseTemplate(DataOutputStream stream, TypeProvider typeProvider)
throws IOException {
PlanNodeType.ALIGNED_SERIES_AGGREGATE_SCAN.serialize(stream);
id.serialize(stream);
ReadWriteIOUtils.write(alignedPath.getNodes().length, stream);
for (String node : alignedPath.getNodes()) {
ReadWriteIOUtils.write(node, stream);
}
}

public static AlignedSeriesAggregationScanNode deserializeUseTemplate(
ByteBuffer byteBuffer, TypeProvider typeProvider) {
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);

int nodeSize = ReadWriteIOUtils.readInt(byteBuffer);
String[] nodes = new String[nodeSize];
for (int i = 0; i < nodeSize; i++) {
nodes[i] = ReadWriteIOUtils.readString(byteBuffer);
}
AlignedPath alignedPath = new AlignedPath(new PartialPath(nodes));
alignedPath.setMeasurementList(typeProvider.getTemplatedInfo().getMeasurementList());
alignedPath.addSchemas(typeProvider.getTemplatedInfo().getSchemaList());

return new AlignedSeriesAggregationScanNode(
planNodeId,
alignedPath,
typeProvider.getTemplatedInfo().aggregationDescriptorList,
typeProvider.getTemplatedInfo().getScanOrder(),
typeProvider.getTemplatedInfo().outputEndTime,
typeProvider.getTemplatedInfo().getPushDownPredicate(),
typeProvider.getTemplatedInfo().groupByTimeParameter,
null);
}

@Override
public boolean equals(Object o) {
if (this == o) {