Skip to content

Commit cfad276

Browse files
authored
Pipe: Configured the air gap timeout to avoid packet loss (#17231) (#17253)
* good-game * ifx * fxi * xif
1 parent eaaf53f commit cfad276

File tree

4 files changed

+37
-0
lines changed

4 files changed

+37
-0
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exc
9595
final AirGapSocket socket = sockets.get(socketIndex);
9696

9797
try {
98+
// When receiver encountered packet loss, the transfer will time out
99+
// We need to restore the transfer quickly by retry under this circumstance
100+
socket.setSoTimeout(PIPE_CONFIG.getPipeAirGapSinkTabletTimeoutMs());
98101
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
99102
doTransferWrapper(socket, (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
100103
} else {
@@ -108,6 +111,8 @@ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exc
108111
"Network error when transfer tablet insertion event %s, because %s.",
109112
((EnrichedEvent) tabletInsertionEvent).coreReportMessage(), e.getMessage()),
110113
e);
114+
} finally {
115+
socket.setSoTimeout(PIPE_CONFIG.getPipeSinkTransferTimeoutMs());
111116
}
112117
}
113118

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ public class CommonConfig {
266266
private long pipeSourceMatcherCacheSize = 1024;
267267

268268
private int pipeSinkHandshakeTimeoutMs = 10 * 1000; // 10 seconds
269+
private int pipeAirGapSinkTabletTimeoutMs = 60 * 1000; // 1 min
269270
private int pipeSinkTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
270271
private int pipeSinkReadFileBufferSize = 5242880; // 5MB
271272
private boolean isPipeSinkReadFileBufferMemoryControlEnabled = false;
@@ -1019,6 +1020,26 @@ public void setPipeSinkHandshakeTimeoutMs(long pipeSinkHandshakeTimeoutMs) {
10191020
}
10201021
}
10211022

1023+
public int getPipeAirGapSinkTabletTimeoutMs() {
1024+
return pipeAirGapSinkTabletTimeoutMs;
1025+
}
1026+
1027+
public void setPipeAirGapSinkTabletTimeoutMs(long pipeAirGapSinkTabletTimeoutMs) {
1028+
final int fPipeAirGapSinkTabletTimeoutMs = this.pipeAirGapSinkTabletTimeoutMs;
1029+
try {
1030+
this.pipeAirGapSinkTabletTimeoutMs = Math.toIntExact(pipeAirGapSinkTabletTimeoutMs);
1031+
} catch (ArithmeticException e) {
1032+
this.pipeAirGapSinkTabletTimeoutMs = Integer.MAX_VALUE;
1033+
logger.warn(
1034+
"Given pipe air gap sink tablet timeout is too large, set to {} ms.", Integer.MAX_VALUE);
1035+
} finally {
1036+
if (fPipeAirGapSinkTabletTimeoutMs != this.pipeAirGapSinkTabletTimeoutMs) {
1037+
logger.info(
1038+
"pipeAirGapSinkTabletTimeoutMs is set to {}.", this.pipeAirGapSinkTabletTimeoutMs);
1039+
}
1040+
}
1041+
}
1042+
10221043
public int getPipeSinkTransferTimeoutMs() {
10231044
return pipeSinkTransferTimeoutMs;
10241045
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,10 @@ public int getPipeSinkHandshakeTimeoutMs() {
179179
return COMMON_CONFIG.getPipeSinkHandshakeTimeoutMs();
180180
}
181181

182+
public int getPipeAirGapSinkTabletTimeoutMs() {
183+
return COMMON_CONFIG.getPipeAirGapSinkTabletTimeoutMs();
184+
}
185+
182186
public int getPipeSinkTransferTimeoutMs() {
183187
return COMMON_CONFIG.getPipeSinkTransferTimeoutMs();
184188
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,13 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
360360
properties.getProperty(
361361
"pipe_connector_handshake_timeout_ms",
362362
String.valueOf(config.getPipeSinkHandshakeTimeoutMs())))));
363+
config.setPipeAirGapSinkTabletTimeoutMs(
364+
Long.parseLong(
365+
Optional.ofNullable(properties.getProperty("pipe_air_gap_sink_tablet_timeout_ms"))
366+
.orElse(
367+
properties.getProperty(
368+
"pipe_air_gap_connector_tablet_timeout_ms",
369+
String.valueOf(config.getPipeAirGapSinkTabletTimeoutMs())))));
363370
config.setPipeSinkReadFileBufferSize(
364371
Integer.parseInt(
365372
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))

0 commit comments

Comments
 (0)