Skip to content

Commit a8d0d4c

Browse files
authored
[Hotfix][CDC] Fix occasional database connection leak when read snapshot split (#7918)
1 parent d7798a6 commit a8d0d4c

File tree

3 files changed

+108
-9
lines changed

3 files changed

+108
-9
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -222,12 +222,15 @@ private void checkReadException() {
222222
@Override
223223
public void close() {
224224
try {
225-
if (taskContext != null) {
226-
taskContext.close();
227-
}
225+
// 1. try close the split task
228226
if (snapshotSplitReadTask != null) {
229-
snapshotSplitReadTask.shutdown();
227+
try {
228+
snapshotSplitReadTask.shutdown();
229+
} catch (Exception e) {
230+
log.error("Close snapshot split read task error", e);
231+
}
230232
}
233+
// 2. close the fetcher thread
231234
if (executorService != null) {
232235
executorService.shutdown();
233236
if (!executorService.awaitTermination(
@@ -240,6 +243,11 @@ public void close() {
240243
}
241244
} catch (Exception e) {
242245
log.error("Close scan fetcher error", e);
246+
} finally {
247+
// 3. close the task context
248+
if (taskContext != null) {
249+
taskContext.close();
250+
}
243251
}
244252
}
245253

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -187,12 +187,15 @@ private void checkReadException() {
187187
@Override
188188
public void close() {
189189
try {
190-
if (taskContext != null) {
191-
taskContext.close();
192-
}
190+
// 1. try close the split task
193191
if (streamFetchTask != null) {
194-
streamFetchTask.shutdown();
192+
try {
193+
streamFetchTask.shutdown();
194+
} catch (Exception e) {
195+
log.error("Close stream split read task error", e);
196+
}
195197
}
198+
// 2. close the fetcher thread
196199
if (executorService != null) {
197200
executorService.shutdown();
198201
if (!executorService.awaitTermination(
@@ -205,6 +208,11 @@ public void close() {
205208
}
206209
} catch (Exception e) {
207210
log.error("Close stream fetcher error", e);
211+
} finally {
212+
// 3. close the task context
213+
if (taskContext != null) {
214+
taskContext.close();
215+
}
208216
}
209217
}
210218

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,9 @@ public void testMultiTableWithRestore(TestContainer container)
328328
clearTable(MYSQL_DATABASE2, SOURCE_TABLE_1);
329329
clearTable(MYSQL_DATABASE2, SOURCE_TABLE_2);
330330

331+
// init
332+
initSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1);
333+
331334
Long jobId = JobIdGenerator.newJobId();
332335
CompletableFuture.supplyAsync(
333336
() -> {
@@ -341,8 +344,32 @@ public void testMultiTableWithRestore(TestContainer container)
341344
}
342345
});
343346

347+
// wait for data written to sink
348+
await().atMost(60000, TimeUnit.MILLISECONDS)
349+
.untilAsserted(
350+
() ->
351+
Assertions.assertTrue(
352+
query(getSourceQuerySQL(MYSQL_DATABASE2, SOURCE_TABLE_1))
353+
.size()
354+
> 1));
355+
356+
// Restore job with snapshot read phase
357+
Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode());
358+
CompletableFuture.supplyAsync(
359+
() -> {
360+
try {
361+
container.restoreJob(
362+
"/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf",
363+
String.valueOf(jobId));
364+
} catch (Exception e) {
365+
log.error("Commit task exception :" + e.getMessage());
366+
throw new RuntimeException(e);
367+
}
368+
return null;
369+
});
370+
344371
// insert update delete
345-
upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1);
372+
changeSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1);
346373

347374
// stream stage
348375
await().atMost(60000, TimeUnit.MILLISECONDS)
@@ -521,6 +548,62 @@ private void executeSql(String sql) {
521548
}
522549
}
523550

551+
private void initSourceTable(String database, String tableName) {
552+
for (int i = 1; i < 100; i++) {
553+
executeSql(
554+
"INSERT INTO "
555+
+ database
556+
+ "."
557+
+ tableName
558+
+ " ( id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
559+
+ " f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer,\n"
560+
+ " f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double,\n"
561+
+ " f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime,\n"
562+
+ " f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time,\n"
563+
+ " f_tinyint, f_tinyint_unsigned, f_json, f_year )\n"
564+
+ "VALUES ( "
565+
+ i
566+
+ ", 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
567+
+ " 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
568+
+ " 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321,\n"
569+
+ " 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
570+
+ " 'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n"
571+
+ " '2023-04-27 11:08:40', 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',\n"
572+
+ " 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field',\n"
573+
+ " 12.345, '14:30:00', -128, 255, '{ \"key\": \"value\" }', 1992 )");
574+
}
575+
}
576+
577+
private void changeSourceTable(String database, String tableName) {
578+
for (int i = 100; i < 110; i++) {
579+
executeSql(
580+
"INSERT INTO "
581+
+ database
582+
+ "."
583+
+ tableName
584+
+ " ( id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
585+
+ " f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer,\n"
586+
+ " f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double,\n"
587+
+ " f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime,\n"
588+
+ " f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time,\n"
589+
+ " f_tinyint, f_tinyint_unsigned, f_json, f_year )\n"
590+
+ "VALUES ( "
591+
+ i
592+
+ ", 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
593+
+ " 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
594+
+ " 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321,\n"
595+
+ " 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
596+
+ " 'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n"
597+
+ " '2023-04-27 11:08:40', 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',\n"
598+
+ " 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field',\n"
599+
+ " 12.345, '14:30:00', -128, 255, '{ \"key\": \"value\" }', 1992 )");
600+
}
601+
602+
executeSql("DELETE FROM " + database + "." + tableName + " where id > 100");
603+
604+
executeSql("UPDATE " + database + "." + tableName + " SET f_bigint = 10000 where id < 10");
605+
}
606+
524607
private void upsertDeleteSourceTable(String database, String tableName) {
525608

526609
executeSql(

0 commit comments

Comments
 (0)