Skip to content

Commit 3eeaa8f

Browse files
committed
Add testInsertSelectFromHudi case
1 parent c321fc5 commit 3eeaa8f

File tree

4 files changed

+65
-13
lines changed

4 files changed

+65
-13
lines changed

lineage-flink1.16.x/pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
<hadoop.version>3.2.2</hadoop.version>
1616
<hive.version>3.1.2</hive.version>
1717
<mysql.cdc.version>2.2.1</mysql.cdc.version>
18+
<hudi.version>0.13.1</hudi.version>
1819
<paimon.version>0.4-SNAPSHOT</paimon.version>
1920
</properties>
2021

@@ -84,6 +85,12 @@
8485
<version>${flink.version}</version>
8586
</dependency>
8687

88+
<dependency>
89+
<groupId>org.apache.hudi</groupId>
90+
<artifactId>hudi-flink1.16-bundle</artifactId>
91+
<version>${hudi.version}</version>
92+
</dependency>
93+
8794
<dependency>
8895
<groupId>org.apache.flink</groupId>
8996
<artifactId>flink-json</artifactId>

lineage-flink1.16.x/src/test/java/com/hw/lineage/flink/basic/AbstractBasicTest.java

+22-5
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,7 @@ protected void createTableOfDimMysqlCompany() {
169169
}
170170

171171
/**
172-
* Create Hudi sink table dwd_hudi_user.
173-
* <p>
174-
* Since flink1.16 does not support hudi, temporarily change the connector to print,
175-
* but the table name remains unchanged
172+
* Create Hudi sink table dwd_hudi_users
176173
*/
177174
protected void createTableOfDwdHudiUsers() {
178175
context.execute("DROP TABLE IF EXISTS dwd_hudi_users");
@@ -185,7 +182,27 @@ protected void createTableOfDwdHudiUsers() {
185182
" ts TIMESTAMP(3) ," +
186183
" `partition` VARCHAR(20) " +
187184
") PARTITIONED BY (`partition`) WITH ( " +
188-
" 'connector' = 'print' " +
185+
" 'connector' = 'hudi' ," +
186+
" 'table.type' = 'COPY_ON_WRITE' ," +
187+
" 'path' = '/hudi/users' ," +
188+
" 'read.streaming.enabled' = 'true' ," +
189+
" 'read.streaming.check-interval' = '1' " +
190+
")");
191+
}
192+
193+
/**
194+
* Create Hudi sink table dws_users_cnt
195+
*/
196+
protected void createTableOfDwsHudiUsersCnt() {
197+
context.execute("DROP TABLE IF EXISTS dws_users_cnt");
198+
199+
context.execute("CREATE TABLE IF NOT EXISTS dws_users_cnt ( " +
200+
" id BIGINT PRIMARY KEY NOT ENFORCED ," +
201+
" name_cnt BIGINT ," +
202+
" company_name_cnt BIGINT " +
203+
") WITH ( " +
204+
" 'connector' = 'hudi' ," +
205+
" 'table.type' = 'MERGE_ON_READ' " +
189206
")");
190207
}
191208

lineage-flink1.16.x/src/test/java/com/hw/lineage/flink/lookup/join/LookupJoinTest.java

+26
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ public void createTable() {
3939

4040
// create hudi sink table dwd_hudi_users
4141
createTableOfDwdHudiUsers();
42+
43+
// Create Hudi sink table dws_users_cnt
44+
createTableOfDwsHudiUsersCnt();
4245
}
4346

4447
/**
@@ -79,4 +82,27 @@ public void testInsertSelectTwoLookupJoin() {
7982
analyzeLineage(sql, expectedArray);
8083
}
8184

85+
/**
86+
* insert-select-from-hudi
87+
*/
88+
@Test
89+
public void testInsertSelectFromHudi() {
90+
String sql = "INSERT into dws_users_cnt " +
91+
"SELECT " +
92+
" id," +
93+
" COUNT(DISTINCT name), " +
94+
" COUNT(DISTINCT company_name) " +
95+
"FROM" +
96+
" dwd_hudi_users " +
97+
"GROUP BY " +
98+
" id";
99+
100+
String[][] expectedArray = {
101+
{"dwd_hudi_users", "id", "dws_users_cnt", "id"},
102+
{"dwd_hudi_users", "name", "dws_users_cnt", "name_cnt", "COUNT(DISTINCT name)"},
103+
{"dwd_hudi_users", "company_name", "dws_users_cnt", "company_name_cnt", "COUNT(DISTINCT company_name)"}
104+
};
105+
106+
analyzeLineage(sql, expectedArray);
107+
}
82108
}

0 commit comments

Comments
 (0)