Skip to content

Commit f5142fe

Browse files
authored
[To dev/1.3] Pipe: Fixed the bug that the default "enable-send-tsfile-limit" is set to false for historical pipes split by full sync (#17264) (#17265)
* may-comp * Avoid someone merge with name 'may-comp'
1 parent cfad276 commit f5142fe

File tree

6 files changed

+38
-27
lines changed

6 files changed

+38
-27
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,8 @@ private void checkConflict(
210210
PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT);
211211

212212
if (enableSendTsFileLimit == null) {
213-
sinkParameters.addAttribute(PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, "true");
213+
sinkParameters.addAttribute(
214+
PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, Boolean.TRUE.toString());
214215
LOGGER.info(
215216
"PipeDataNodeTaskBuilder: When the realtime sync is enabled, we enable rate limiter in sending tsfile by default to reserve disk and network IO for realtime sending.");
216217
} else if (!enableSendTsFileLimit) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
5252
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
5353
import org.apache.iotdb.commons.pipe.config.PipeConfig;
54+
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
5455
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
5556
import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
5657
import org.apache.iotdb.commons.schema.cache.CacheClearOptions;
@@ -1817,9 +1818,9 @@ public SettableFuture<ConfigTaskResult> createPipe(
18171818
PipeDataNodeAgent.plugin()
18181819
.validate(
18191820
pipeName,
1820-
createPipeStatement.getExtractorAttributes(),
1821+
createPipeStatement.getSourceAttributes(),
18211822
createPipeStatement.getProcessorAttributes(),
1822-
createPipeStatement.getConnectorAttributes());
1823+
createPipeStatement.getSinkAttributes());
18231824
} catch (final Exception e) {
18241825
LOGGER.info("Failed to validate create pipe statement, because {}", e.getMessage(), e);
18251826
future.setException(
@@ -1830,7 +1831,9 @@ public SettableFuture<ConfigTaskResult> createPipe(
18301831
// Syntactic sugar: if full-sync mode is detected (i.e. not snapshot mode, or both realtime
18311832
// and history are true), the pipe is split into history-only and realtime–only modes.
18321833
final PipeParameters sourcePipeParameters =
1833-
new PipeParameters(createPipeStatement.getExtractorAttributes());
1834+
new PipeParameters(createPipeStatement.getSourceAttributes());
1835+
final PipeParameters sinkPipeParameters =
1836+
new PipeParameters(createPipeStatement.getSinkAttributes());
18341837
if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
18351838
&& PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
18361839
try (final ConfigNodeClient configNodeClient =
@@ -1854,7 +1857,7 @@ public SettableFuture<ConfigTaskResult> createPipe(
18541857
Boolean.toString(false))))
18551858
.getAttribute())
18561859
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
1857-
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
1860+
.setConnectorAttributes(createPipeStatement.getSinkAttributes());
18581861

18591862
final TSStatus realtimeTsStatus = configNodeClient.createPipe(realtimeReq);
18601863
// If creation fails, immediately return with exception
@@ -1888,7 +1891,14 @@ public SettableFuture<ConfigTaskResult> createPipe(
18881891
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
18891892
.getAttribute())
18901893
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
1891-
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
1894+
.setConnectorAttributes(
1895+
sinkPipeParameters
1896+
.addOrReplaceEquivalentAttributesWithClone(
1897+
new PipeParameters(
1898+
Collections.singletonMap(
1899+
PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
1900+
Boolean.TRUE.toString())))
1901+
.getAttribute());
18921902

18931903
final TSStatus historyTsStatus = configNodeClient.createPipe(historyReq);
18941904
// If creation fails, immediately return with exception
@@ -1912,9 +1922,9 @@ public SettableFuture<ConfigTaskResult> createPipe(
19121922
new TCreatePipeReq()
19131923
.setPipeName(pipeName)
19141924
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
1915-
.setExtractorAttributes(createPipeStatement.getExtractorAttributes())
1925+
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
19161926
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
1917-
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
1927+
.setConnectorAttributes(createPipeStatement.getSinkAttributes());
19181928
TSStatus tsStatus = configNodeClient.createPipe(req);
19191929
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
19201930
LOGGER.warn(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class CreatePipeTask implements IConfigTask {
3737

3838
public CreatePipeTask(CreatePipeStatement createPipeStatement) {
3939
// support now() function
40-
applyNowFunctionToExtractorAttributes(createPipeStatement.getExtractorAttributes());
40+
applyNowFunctionToExtractorAttributes(createPipeStatement.getSourceAttributes());
4141
this.createPipeStatement = createPipeStatement;
4242
}
4343

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3822,11 +3822,11 @@ public Statement visitCreatePipe(IoTDBSqlParser.CreatePipeContext ctx) {
38223822
ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null);
38233823

38243824
if (ctx.extractorAttributesClause() != null) {
3825-
createPipeStatement.setExtractorAttributes(
3825+
createPipeStatement.setSourceAttributes(
38263826
parseExtractorAttributesClause(
38273827
ctx.extractorAttributesClause().extractorAttributeClause()));
38283828
} else {
3829-
createPipeStatement.setExtractorAttributes(new HashMap<>());
3829+
createPipeStatement.setSourceAttributes(new HashMap<>());
38303830
}
38313831
if (ctx.processorAttributesClause() != null) {
38323832
createPipeStatement.setProcessorAttributes(
@@ -3836,11 +3836,11 @@ public Statement visitCreatePipe(IoTDBSqlParser.CreatePipeContext ctx) {
38363836
createPipeStatement.setProcessorAttributes(new HashMap<>());
38373837
}
38383838
if (ctx.connectorAttributesClause() != null) {
3839-
createPipeStatement.setConnectorAttributes(
3839+
createPipeStatement.setSinkAttributes(
38403840
parseConnectorAttributesClause(
38413841
ctx.connectorAttributesClause().connectorAttributeClause()));
38423842
} else {
3843-
createPipeStatement.setConnectorAttributes(
3843+
createPipeStatement.setSinkAttributes(
38443844
parseConnectorAttributesClause(
38453845
ctx.connectorAttributesWithoutWithSinkClause().connectorAttributeClause()));
38463846
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ public class CreatePipeStatement extends Statement implements IConfigStatement {
3838

3939
private String pipeName;
4040
private boolean ifNotExistsCondition;
41-
private Map<String, String> extractorAttributes;
41+
private Map<String, String> sourceAttributes;
4242
private Map<String, String> processorAttributes;
43-
private Map<String, String> connectorAttributes;
43+
private Map<String, String> sinkAttributes;
4444

4545
public CreatePipeStatement(StatementType createPipeStatement) {
4646
this.statementType = createPipeStatement;
@@ -54,16 +54,16 @@ public boolean hasIfNotExistsCondition() {
5454
return ifNotExistsCondition;
5555
}
5656

57-
public Map<String, String> getExtractorAttributes() {
58-
return extractorAttributes;
57+
public Map<String, String> getSourceAttributes() {
58+
return sourceAttributes;
5959
}
6060

6161
public Map<String, String> getProcessorAttributes() {
6262
return processorAttributes;
6363
}
6464

65-
public Map<String, String> getConnectorAttributes() {
66-
return connectorAttributes;
65+
public Map<String, String> getSinkAttributes() {
66+
return sinkAttributes;
6767
}
6868

6969
public void setPipeName(String pipeName) {
@@ -74,16 +74,16 @@ public void setIfNotExists(boolean ifNotExistsCondition) {
7474
this.ifNotExistsCondition = ifNotExistsCondition;
7575
}
7676

77-
public void setExtractorAttributes(Map<String, String> extractorAttributes) {
78-
this.extractorAttributes = extractorAttributes;
77+
public void setSourceAttributes(Map<String, String> sourceAttributes) {
78+
this.sourceAttributes = sourceAttributes;
7979
}
8080

8181
public void setProcessorAttributes(Map<String, String> processorAttributes) {
8282
this.processorAttributes = processorAttributes;
8383
}
8484

85-
public void setConnectorAttributes(Map<String, String> connectorAttributes) {
86-
this.connectorAttributes = connectorAttributes;
85+
public void setSinkAttributes(Map<String, String> sinkAttributes) {
86+
this.sinkAttributes = sinkAttributes;
8787
}
8888

8989
@Override

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/statement/sys/pipe/PipeStatementTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,14 @@ public void testCreatePipeStatement() {
4242
CreatePipeStatement statement = new CreatePipeStatement(StatementType.CREATE_PIPE);
4343

4444
statement.setPipeName("test");
45-
statement.setExtractorAttributes(extractorAttributes);
45+
statement.setSourceAttributes(extractorAttributes);
4646
statement.setProcessorAttributes(processorAttributes);
47-
statement.setConnectorAttributes(connectorAttributes);
47+
statement.setSinkAttributes(connectorAttributes);
4848

4949
Assert.assertEquals("test", statement.getPipeName());
50-
Assert.assertEquals(extractorAttributes, statement.getExtractorAttributes());
50+
Assert.assertEquals(extractorAttributes, statement.getSourceAttributes());
5151
Assert.assertEquals(processorAttributes, statement.getProcessorAttributes());
52-
Assert.assertEquals(connectorAttributes, statement.getConnectorAttributes());
52+
Assert.assertEquals(connectorAttributes, statement.getSinkAttributes());
5353

5454
Assert.assertEquals(QueryType.WRITE, statement.getQueryType());
5555
}

0 commit comments

Comments
 (0)