Skip to content

Commit c0640f7

Browse files
Ammu ParvathyAmmu Parvathy
Ammu Parvathy
authored and
Ammu Parvathy
committed
initial commit
0 parents  commit c0640f7

23 files changed

+319
-0
lines changed

.idea/.gitignore

+3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/compiler.xml

+13
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/encodings.xml

+7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/inspectionProfiles/Project_Default.xml

+8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/jarRepositories.xml

+20
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/misc.xml

+14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pom.xml

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>org.example</groupId>
8+
<artifactId>learn-flink</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<properties>
12+
<maven.compiler.source>11</maven.compiler.source>
13+
<maven.compiler.target>11</maven.compiler.target>
14+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15+
</properties>
16+
<dependencies>
17+
<!-- Flink dependencies -->
18+
<dependency>
19+
<groupId>org.apache.flink</groupId>
20+
<artifactId>flink-java</artifactId>
21+
<version>1.14.0</version>
22+
</dependency>
23+
<dependency>
24+
<groupId>org.apache.flink</groupId>
25+
<artifactId>flink-streaming-java_2.12</artifactId>
26+
<version>1.14.0</version>
27+
</dependency>
28+
<dependency>
29+
<groupId>org.apache.flink</groupId>
30+
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
31+
<version>1.14.0</version>
32+
</dependency>
33+
<dependency>
34+
<groupId>org.apache.flink</groupId>
35+
<artifactId>flink-connector-files</artifactId>
36+
<version>1.14.0</version>
37+
</dependency>
38+
<dependency>
39+
<groupId>org.apache.flink</groupId>
40+
<artifactId>flink-table-planner_2.12</artifactId>
41+
<version>1.14.0</version>
42+
</dependency>
43+
<dependency>
44+
<groupId>org.apache.flink</groupId>
45+
<artifactId>flink-json</artifactId>
46+
<version>1.14.0</version>
47+
</dependency>
48+
<dependency>
49+
<groupId>org.apache.flink</groupId>
50+
<artifactId>flink-runtime-web</artifactId>
51+
<version>1.15.2</version>
52+
</dependency>
53+
</dependencies>
54+
</project>
+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import org.apache.flink.table.api.EnvironmentSettings;
2+
import org.apache.flink.table.api.Table;
3+
import org.apache.flink.table.api.TableEnvironment;
4+
import org.apache.flink.table.api.TableResult;
5+
6+
public class BatchTableApiExample {
7+
public static void main(String[] args) throws Exception {
8+
// Set up the batch environment
9+
EnvironmentSettings settings = EnvironmentSettings.newInstance()
10+
.inBatchMode()
11+
.build();
12+
TableEnvironment tableEnv = TableEnvironment.create(settings);
13+
14+
// Register a CSV table source
15+
String csvInputPath = "src/main/resources/data-in/timestamp.txt";
16+
tableEnv.executeSql(
17+
"CREATE TABLE InputTable (" +
18+
" id INT," +
19+
" name STRING," +
20+
" age INT" +
21+
") WITH (" +
22+
" 'connector' = 'filesystem'," +
23+
" 'path' = '" + csvInputPath + "'," +
24+
" 'format' = 'json'" +
25+
")"
26+
);
27+
28+
// Define a simple query on the table
29+
Table resultTable = tableEnv.sqlQuery("SELECT id, name FROM InputTable WHERE age > 30");
30+
31+
// Execute the query and collect the results
32+
TableResult result = resultTable.execute();
33+
34+
// Print the results to the console
35+
result.print();
36+
37+
// Write the result to a CSV file
38+
String csvOutputPath = "src/main/resources/data-out/output.txt";
39+
tableEnv.executeSql(
40+
"CREATE TABLE OutputTable (" +
41+
" id INT," +
42+
" name STRING" +
43+
") WITH (" +
44+
" 'connector' = 'filesystem'," +
45+
" 'path' = '" + csvOutputPath + "'," +
46+
" 'format' = 'json'" +
47+
")"
48+
);
49+
50+
resultTable.executeInsert("OutputTable").collect();
51+
}
52+
}

src/main/java/SavePointExample.java

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import org.apache.flink.table.api.EnvironmentSettings;
2+
import org.apache.flink.table.api.TableEnvironment;
3+
4+
import java.time.ZoneId;
5+
6+
public final class SavePointExample {
7+
8+
public static void main(String[] args) {
9+
10+
final String JOB_NAME = "FlinkJob";
11+
12+
final EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
13+
final TableEnvironment tEnv = TableEnvironment.create(settings);
14+
tEnv.getConfig().set("pipeline.name", JOB_NAME);
15+
tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
16+
17+
tEnv.executeSql("CREATE TEMPORARY TABLE ApiLog (" +
18+
" `_timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL," +
19+
" `_partition` INT METADATA FROM 'partition' VIRTUAL," +
20+
" `_offset` BIGINT METADATA FROM 'offset' VIRTUAL," +
21+
" `Data` STRING," +
22+
" `Action` STRING," +
23+
" `ProduceDateTime` TIMESTAMP_LTZ(6)," +
24+
" `OffSet` INT" +
25+
") WITH (" +
26+
" 'connector' = 'kafka'," +
27+
" 'topic' = 'api.log'," +
28+
" 'properties.group.id' = 'flink'," +
29+
" 'properties.bootstrap.servers' = '<mykafkahost...>'," +
30+
" 'format' = 'json'," +
31+
" 'json.timestamp-format.standard' = 'ISO-8601'" +
32+
")");
33+
34+
tEnv.executeSql("CREATE TABLE print_table (" +
35+
" `_timestamp` TIMESTAMP(3)," +
36+
" `_partition` INT," +
37+
" `_offset` BIGINT," +
38+
" `Data` STRING," +
39+
" `Action` STRING," +
40+
" `ProduceDateTime` TIMESTAMP(6)," +
41+
" `OffSet` INT" +
42+
") WITH ('connector' = 'print')");
43+
44+
tEnv.executeSql("INSERT INTO print_table" +
45+
" SELECT * FROM ApiLog");
46+
47+
}
48+
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import org.apache.flink.table.api.EnvironmentSettings;
2+
import org.apache.flink.table.api.Table;
3+
import org.apache.flink.table.api.TableEnvironment;
4+
import org.apache.flink.table.api.TableResult;
5+
6+
public class WindowAggregateBatchTableApiExample {
7+
public static void main(String[] args) throws Exception {
8+
// Set up the batch environment
9+
EnvironmentSettings settings = EnvironmentSettings.newInstance()
10+
.inBatchMode()
11+
.build();
12+
TableEnvironment tableEnv = TableEnvironment.create(settings);
13+
14+
// Register a CSV table source
15+
String csvInputPath = "src/main/resources/data-in/aggregate.txt";
16+
tableEnv.executeSql(
17+
"CREATE TABLE InputTable (" +
18+
" bidtime TIMESTAMP(3)," +
19+
" price DECIMAL(10,2)," +
20+
" item STRING," +
21+
" supplier_id STRING" +
22+
") WITH (" +
23+
" 'connector' = 'filesystem'," +
24+
" 'path' = '" + csvInputPath + "'," +
25+
" 'format' = 'json'" +
26+
")"
27+
);
28+
29+
//Describe table and print
30+
TableResult descTable = tableEnv.executeSql("desc InputTable");
31+
descTable.print();
32+
// Define a simple query on the table
33+
Table resultTable = tableEnv.sqlQuery("SELECT window_start, window_end, SUM(price) AS total_price\n" +
34+
" FROM TABLE(\n" +
35+
" TUMBLE(TABLE InputTable, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))\n" +
36+
" GROUP BY window_start, window_end");
37+
//working
38+
/* Table resultTable = tableEnv.sqlQuery("SELECT TUMBLE_START(bidtime, INTERVAL '10' MINUTES) AS window_start,\n" +
39+
" TUMBLE_END(bidtime, INTERVAL '10' MINUTES) AS window_end, SUM(price) AS total_price\n" +
40+
" FROM InputTable\n" +
41+
" GROUP BY TUMBLE(bidtime, INTERVAL '10' MINUTES)");*/
42+
//testing
43+
/* Table resultTable = tableEnv.sqlQuery("SELECT window_start,\n" +
44+
" window_end, SUM(price) AS total_price\n" +
45+
" FROM TABLE(\n" +
46+
" TUMBLE(TABLE InputTable, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))\n" +
47+
" GROUP BY window_start, window_end");*/
48+
49+
// Execute the query and collect the results
50+
TableResult result = resultTable.execute();
51+
52+
// Print the results to the console
53+
result.print();
54+
55+
// Write the result to a CSV file
56+
String csvOutputPath = "src/main/resources/data-out/output.txt";
57+
/*tableEnv.executeSql(
58+
"CREATE TABLE OutputTable (" +
59+
" id INT," +
60+
" name STRING" +
61+
") WITH (" +
62+
" 'connector' = 'filesystem'," +
63+
" 'path' = '" + csvOutputPath + "'," +
64+
" 'format' = 'json'" +
65+
")"
66+
);
67+
68+
resultTable.executeInsert("OutputTable").collect();*/
69+
}
70+
}
+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{"bidtime": "2020-04-15 08:05:00", "price": 4.00, "item": "C", "supplier_id": "supplier1" }
2+
{"bidtime": "2020-04-15 08:07:00", "price": 2.00, "item": "A", "supplier_id": "supplier1" }
3+
{"bidtime": "2020-04-15 08:09:00", "price": 5.00, "item": "D", "supplier_id": "supplier2" }
4+
{"bidtime": "2020-04-15 08:11:00", "price": 3.00, "item": "B", "supplier_id": "supplier2" }
5+
{"bidtime": "2020-04-15 08:13:00", "price": 1.00, "item": "E", "supplier_id": "supplier1" }
6+
{"bidtime": "2020-04-15 08:17:00", "price": 6.00, "item": "F", "supplier_id": "supplier2" }
+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{"id": 0, "name":"Ammu", "age":"32" }
2+
{"id": 1, "name":"Anandu", "age":"34" }
3+
{"id": 2, "name":"Dhruvan", "age":"3" }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
{"id":0,"name":"Ammu"}
2+
{"id":1,"name":"Anandu"}
2.63 KB
Binary file not shown.
Binary file not shown.

target/classes/data-in/aggregate.txt

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{"bidtime": "2020-04-15 08:05:00", "price": 4.00, "item": "C", "supplier_id": "supplier1" }
2+
{"bidtime": "2020-04-15 08:07:00", "price": 2.00, "item": "A", "supplier_id": "supplier1" }
3+
{"bidtime": "2020-04-15 08:09:00", "price": 5.00, "item": "D", "supplier_id": "supplier2" }
4+
{"bidtime": "2020-04-15 08:11:00", "price": 3.00, "item": "B", "supplier_id": "supplier2" }
5+
{"bidtime": "2020-04-15 08:13:00", "price": 1.00, "item": "E", "supplier_id": "supplier1" }
6+
{"bidtime": "2020-04-15 08:17:00", "price": 6.00, "item": "F", "supplier_id": "supplier2" }

target/classes/data-in/timestamp.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{"id": 0, "name":"Ammu", "age":"32" }
2+
{"id": 1, "name":"Anandu", "age":"34" }
3+
{"id": 2, "name":"Dhruvan", "age":"3" }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
{"id":0,"name":"Ammu"}
2+
{"id":1,"name":"Anandu"}

target/learn-flink-1.0-SNAPSHOT.jar

3.14 KB
Binary file not shown.

target/maven-archiver/pom.properties

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#Generated by Maven
2+
#Mon Jul 22 21:28:52 IST 2024
3+
artifactId=learn-flink
4+
groupId=org.example
5+
version=1.0-SNAPSHOT
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
BatchTableApiExample.class
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/Users/ammusparvathy/Documents/learning/learn-flink/src/main/java/BatchTableApiExample.java

target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/inputFiles.lst

Whitespace-only changes.

0 commit comments

Comments
 (0)