Skip to content

Commit 9424778

Browse files
authored
Pipe: Fixed the bug that the default "enable-send-tsfile-limit" is set to false for historical pipes split by full sync (#17264)
* may-comp * Avoid someone merge with name 'may-comp'
1 parent cd8f3db commit 9424778

File tree

2 files changed

+12
-2
lines changed

2 files changed

+12
-2
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,8 @@ private void checkConflict(
213213
PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT);
214214

215215
if (enableSendTsFileLimit == null) {
216-
sinkParameters.addAttribute(PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, "true");
216+
sinkParameters.addAttribute(
217+
PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, Boolean.TRUE.toString());
217218
LOGGER.info(
218219
"PipeDataNodeTaskBuilder: When the realtime sync is enabled, we enable rate limiter in sending tsfile by default to reserve disk and network IO for realtime sending.");
219220
} else if (!enableSendTsFileLimit) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2203,6 +2203,8 @@ public SettableFuture<ConfigTaskResult> createPipe(
22032203
// and history are true), the pipe is split into history-only and realtime–only modes.
22042204
final PipeParameters sourcePipeParameters =
22052205
new PipeParameters(createPipeStatement.getSourceAttributes());
2206+
final PipeParameters sinkPipeParameters =
2207+
new PipeParameters(createPipeStatement.getSinkAttributes());
22062208
if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
22072209
&& PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
22082210
try (final ConfigNodeClient configNodeClient =
@@ -2260,7 +2262,14 @@ public SettableFuture<ConfigTaskResult> createPipe(
22602262
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
22612263
.getAttribute())
22622264
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
2263-
.setConnectorAttributes(createPipeStatement.getSinkAttributes());
2265+
.setConnectorAttributes(
2266+
sinkPipeParameters
2267+
.addOrReplaceEquivalentAttributesWithClone(
2268+
new PipeParameters(
2269+
Collections.singletonMap(
2270+
PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
2271+
Boolean.TRUE.toString())))
2272+
.getAttribute());
22642273

22652274
final TSStatus historyTsStatus = configNodeClient.createPipe(historyReq);
22662275
// If creation fails, immediately return with exception

0 commit comments

Comments
 (0)