Skip to content

Commit f387e5f

Browse files
[b/390590500] Airflow connector (#745)
* Add Airflow connector * add comments for Airflow versions * test and review fixes * replace array of drivers with ImmutableList * add todo * makes tables for Airflow 2.x optional * Extract driver classes into enum * update airflow tables
1 parent 2e3610b commit f387e5f

File tree

12 files changed

+356
-26
lines changed

12 files changed

+356
-26
lines changed

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/ConnectorArguments.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,11 @@ public ImmutableList<String> getDatabases() {
764764
.collect(toImmutableList());
765765
}
766766

767+
@CheckForNull
768+
public String getSchema() {
769+
return getOptions().valueOf(optionSchema);
770+
}
771+
767772
@Nonnull
768773
public Predicate<String> getDatabasePredicate() {
769774
return toPredicate(getDatabases());

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/annotations/RespectsArgumentDriverClass.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,5 @@
4343
description = RespectsArgumentDriverClass.DESCRIPTION)
4444
public @interface RespectsArgumentDriverClass {
4545

46-
public static final String DESCRIPTION =
47-
"Optionally overrides the vendor-specified JDBC driver class name.";
46+
String DESCRIPTION = "Optionally overrides the vendor-specified JDBC driver class name.";
4847
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/annotations/RespectsArgumentSchemaPredicate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,5 @@
3535
description = RespectsArgumentSchemaPredicate.DESCRIPTION)
3636
public @interface RespectsArgumentSchemaPredicate {
3737

38-
public static final String DESCRIPTION = "The list of schemas to dump, separated by commas.";
38+
String DESCRIPTION = "The list of schemas to dump, separated by commas.";
3939
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/AbstractJdbcConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public ClassLoader run() throws Exception {
6464
* Creates a new ClassLoader for loading the JDBC Driver.
6565
*
6666
* @param parentClassLoader The parent ClassLoader.
67-
* @param driverPath A comma-separated list of paths to JAR files for inclusion in the new
67+
* @param driverPaths A comma-separated list of paths to JAR files for inclusion in the new
6868
* ClassLoader.
6969
* @return The JDBC ClassLoader to use to load the Driver.
7070
* @throws PrivilegedActionException
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Copyright 2022-2024 Google LLC
3+
* Copyright 2013-2021 CompilerWorks
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.google.edwmigration.dumper.application.dumper.connector.airflow;
18+
19+
import static com.google.edwmigration.dumper.application.dumper.connector.airflow.AirflowDatabaseDriverClasses.jdbcPrefixForClassName;
20+
21+
import com.google.auto.service.AutoService;
22+
import com.google.common.base.Preconditions;
23+
import com.google.common.collect.ImmutableList;
24+
import com.google.edwmigration.dumper.application.dumper.ConnectorArguments;
25+
import com.google.edwmigration.dumper.application.dumper.annotations.RespectsArgumentDriverRequired;
26+
import com.google.edwmigration.dumper.application.dumper.annotations.RespectsArgumentPassword;
27+
import com.google.edwmigration.dumper.application.dumper.annotations.RespectsArgumentUser;
28+
import com.google.edwmigration.dumper.application.dumper.annotations.RespectsInput;
29+
import com.google.edwmigration.dumper.application.dumper.connector.AbstractJdbcConnector;
30+
import com.google.edwmigration.dumper.application.dumper.connector.Connector;
31+
import com.google.edwmigration.dumper.application.dumper.connector.MetadataConnector;
32+
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
33+
import com.google.edwmigration.dumper.application.dumper.handle.JdbcHandle;
34+
import com.google.edwmigration.dumper.application.dumper.task.DumpMetadataTask;
35+
import com.google.edwmigration.dumper.application.dumper.task.FormatTask;
36+
import com.google.edwmigration.dumper.application.dumper.task.JdbcSelectTask;
37+
import com.google.edwmigration.dumper.application.dumper.task.Task;
38+
import com.google.edwmigration.dumper.application.dumper.task.TaskCategory;
39+
import com.google.edwmigration.dumper.application.dumper.utils.ArchiveNameUtil;
40+
import com.google.edwmigration.dumper.plugin.ext.jdk.annotation.Description;
41+
import java.sql.Driver;
42+
import java.time.Clock;
43+
import java.util.List;
44+
import javax.annotation.Nonnull;
45+
import javax.sql.DataSource;
46+
import org.slf4j.Logger;
47+
import org.slf4j.LoggerFactory;
48+
49+
@AutoService(Connector.class)
50+
@Description("Dumps DAGs metadata from Airflow.")
51+
@RespectsInput(
52+
order = 100,
53+
arg = ConnectorArguments.OPT_HOST,
54+
description = "Airflow database host.")
55+
@RespectsInput(
56+
order = 200,
57+
arg = ConnectorArguments.OPT_PORT,
58+
description = "The port of the database.")
59+
@RespectsInput(
60+
order = 250,
61+
arg = ConnectorArguments.OPT_SCHEMA,
62+
defaultValue = "airflow_db",
63+
description = "Airflow database schema name")
64+
@RespectsArgumentDriverRequired
65+
@RespectsArgumentUser
66+
@RespectsArgumentPassword
67+
public class AirflowConnector extends AbstractJdbcConnector implements MetadataConnector {
68+
69+
private static final Logger LOG = LoggerFactory.getLogger(AirflowConnector.class);
70+
71+
private static final String FORMAT_NAME = "airflow.dump.zip";
72+
73+
private final ImmutableList<String> driverClasses =
74+
ImmutableList.of(
75+
// the order is important! The first class found will be used as a jdbc connection.
76+
AirflowDatabaseDriverClasses.MARIADB.getDriverClassName(),
77+
AirflowDatabaseDriverClasses.MYSQL.getDriverClassName(),
78+
AirflowDatabaseDriverClasses.MYSQL_OLD.getDriverClassName(),
79+
AirflowDatabaseDriverClasses.POSTGRESQL.getDriverClassName());
80+
81+
public AirflowConnector() {
82+
super("airflow");
83+
}
84+
85+
@Nonnull
86+
@Override
87+
public String getDefaultFileName(boolean isAssessment, Clock clock) {
88+
return ArchiveNameUtil.getFileNameWithTimestamp(getName(), clock);
89+
}
90+
91+
@Override
92+
public void addTasksTo(@Nonnull List<? super Task<?>> out, @Nonnull ConnectorArguments arguments)
93+
throws Exception {
94+
out.add(new DumpMetadataTask(arguments, FORMAT_NAME));
95+
out.add(new FormatTask(FORMAT_NAME));
96+
97+
// Airflow v1.5.0
98+
addFullTable(out, "dag.csv", "select * from dag;");
99+
// Airflow v1.6.0
100+
addFullTable(out, "dag_run.csv", "select * from dag_run;");
101+
102+
// Airflow v1.10.7
103+
// analog of DAG's python definition in json
104+
addFullTable(out, "serialized_dag.csv", "select * from serialized_dag;", TaskCategory.OPTIONAL);
105+
// Airflow v1.10.10
106+
addFullTable(out, "dag_code.csv", "select * from dag_code;", TaskCategory.OPTIONAL);
107+
}
108+
109+
private static void addFullTable(List<? super Task<?>> out, String filename, String sql) {
110+
addFullTable(out, filename, sql, TaskCategory.REQUIRED);
111+
}
112+
113+
private static void addFullTable(
114+
List<? super Task<?>> out, String filename, String sql, TaskCategory taskCategory) {
115+
out.add(new JdbcSelectTask(filename, sql, taskCategory));
116+
}
117+
118+
@Nonnull
119+
@Override
120+
public Handle open(@Nonnull ConnectorArguments arguments) throws Exception {
121+
// todo support jdbc string in --url
122+
Preconditions.checkNotNull(arguments.getHost(), "Database host must be provided.");
123+
Preconditions.checkNotNull(arguments.getPort(), "Database port must be provided.");
124+
Preconditions.checkState(
125+
arguments.getDriverPaths() != null && !arguments.getDriverPaths().isEmpty(),
126+
"Path to jdbc driver must be provided");
127+
128+
// todo fix warning logs, because no drivers is expected case
129+
Driver driver = loadFirstAvailableDriver(arguments.getDriverPaths());
130+
String host = arguments.getHost();
131+
int port = arguments.getPort();
132+
String schema = arguments.getSchema();
133+
134+
String jdbcString =
135+
jdbcPrefixForClassName(driver.getClass().getName()) + host + ":" + port + "/" + schema;
136+
LOG.info("Connecting to jdbc string [{}]...", jdbcString);
137+
138+
DataSource dataSource = newSimpleDataSource(driver, jdbcString, arguments);
139+
return JdbcHandle.newPooledJdbcHandle(dataSource, 1);
140+
}
141+
142+
private Driver loadFirstAvailableDriver(List<String> driverPaths) throws Exception {
143+
return newDriver(driverPaths, driverClasses.toArray(driverClasses.toArray(new String[0])));
144+
}
145+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2022-2024 Google LLC
3+
* Copyright 2013-2021 CompilerWorks
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.google.edwmigration.dumper.application.dumper.connector.airflow;
18+
19+
import com.google.common.base.Preconditions;
20+
import com.google.common.collect.ImmutableMap;
21+
22+
enum AirflowDatabaseDriverClasses {
23+
MYSQL_OLD("com.mysql.jdbc.Driver", "jdbc:mysql://"),
24+
MYSQL("com.mysql.cj.jdbc.Driver", "jdbc:mysql://"),
25+
MARIADB("org.mariadb.jdbc.Driver", "jdbc:mariadb://"),
26+
POSTGRESQL("org.postgresql.Driver", "jdbc:postgresql://");
27+
private final String driverClassName;
28+
private final String jdbcStringPrefix;
29+
static final ImmutableMap<String, AirflowDatabaseDriverClasses> classNameToEnum;
30+
31+
static {
32+
ImmutableMap.Builder<String, AirflowDatabaseDriverClasses> builder =
33+
new ImmutableMap.Builder<>();
34+
for (AirflowDatabaseDriverClasses driverClassName : AirflowDatabaseDriverClasses.values()) {
35+
builder.put(driverClassName.getDriverClassName(), driverClassName);
36+
}
37+
classNameToEnum = builder.build();
38+
}
39+
40+
AirflowDatabaseDriverClasses(String driverClassName, String jdbcStringPrefix) {
41+
this.driverClassName = driverClassName;
42+
this.jdbcStringPrefix = jdbcStringPrefix;
43+
}
44+
45+
public String getDriverClassName() {
46+
return driverClassName;
47+
}
48+
49+
public String getJdbcStringPrefix() {
50+
return jdbcStringPrefix;
51+
}
52+
53+
public static String jdbcPrefixForClassName(String driverClassName) {
54+
AirflowDatabaseDriverClasses driverClass = classNameToEnum.get(driverClassName);
55+
Preconditions.checkNotNull(driverClass, "Unsupported driver class name: " + driverClassName);
56+
return driverClass.jdbcStringPrefix;
57+
}
58+
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/task/AbstractJdbcTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ protected static <T> T doSelect(
247247
// Enables cursors in PostgreSQL.
248248
// Teradata says that this can reduce the fetch size below 1Mb, but not increase it.
249249
statement.setFetchSize(16384);
250-
LOG.debug("Statement preparation took " + stopwatch + ". Executing...");
250+
LOG.debug("Statement preparation took {}. Executing...", stopwatch);
251251
}
252252

253253
EXECUTE:

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/task/JdbcSelectTask.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,10 @@
2121
import java.sql.Connection;
2222
import java.sql.SQLException;
2323
import javax.annotation.Nonnull;
24-
import org.slf4j.Logger;
25-
import org.slf4j.LoggerFactory;
2624
import org.springframework.jdbc.core.ResultSetExtractor;
2725

2826
/** @author shevek */
2927
public class JdbcSelectTask extends AbstractJdbcTask<Summary> {
30-
31-
@SuppressWarnings("UnusedVariable")
32-
private static final Logger LOG = LoggerFactory.getLogger(JdbcSelectTask.class);
33-
3428
@Nonnull private final String sql;
3529

3630
@Nonnull private final TaskCategory taskCategory;
@@ -65,7 +59,7 @@ protected Summary doInConnection(
6559
@Nonnull Connection connection)
6660
throws SQLException {
6761
ResultSetExtractor<Summary> rse = newCsvResultSetExtractor(sink);
68-
return doSelect(connection, rse, sql);
62+
return doSelect(connection, rse, getSql());
6963
}
7064

7165
@Override
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2022-2024 Google LLC
3+
* Copyright 2013-2021 CompilerWorks
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.google.edwmigration.dumper.application.dumper.connector.airflow;
18+
19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertNotNull;
21+
import static org.junit.Assert.assertTrue;
22+
23+
import com.google.common.collect.ImmutableMap;
24+
import com.google.edwmigration.dumper.application.dumper.task.DumpMetadataTask;
25+
import com.google.edwmigration.dumper.application.dumper.task.FormatTask;
26+
import com.google.edwmigration.dumper.application.dumper.task.JdbcSelectTask;
27+
import com.google.edwmigration.dumper.application.dumper.task.Task;
28+
import com.google.edwmigration.dumper.application.dumper.task.TaskCategory;
29+
import java.util.ArrayList;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.Map.Entry;
33+
import java.util.stream.Collectors;
34+
import org.junit.Test;
35+
36+
public class AirflowConnectorTest {
37+
private final AirflowConnector connector = new AirflowConnector();
38+
39+
@Test
40+
public void addTasksTo_containsJdbcSelect_required() throws Exception {
41+
Map<String, String> expectedFilesToTables =
42+
ImmutableMap.of(
43+
"dag.csv", "dag",
44+
"dag_run.csv", "dag_run");
45+
46+
testJdbcSelectTasks(expectedFilesToTables, TaskCategory.REQUIRED);
47+
}
48+
49+
@Test
50+
public void addTasksTo_containsJdbcSelect_optional() throws Exception {
51+
Map<String, String> expectedFilesToTables =
52+
ImmutableMap.of("serialized_dag.csv", "serialized_dag", "dag_code.csv", "dag_code");
53+
54+
testJdbcSelectTasks(expectedFilesToTables, TaskCategory.OPTIONAL);
55+
}
56+
57+
private void testJdbcSelectTasks(
58+
Map<String, String> expectedFilesToTables, TaskCategory taskCategory) throws Exception {
59+
List<Task<?>> tasks = new ArrayList<>();
60+
61+
// Act
62+
connector.addTasksTo(tasks, null);
63+
64+
// Assert
65+
Map<String, String> existingFilesToTables =
66+
tasks.stream()
67+
.filter(t -> t instanceof JdbcSelectTask)
68+
.map(JdbcSelectTask.class::cast)
69+
.filter(t -> t.getCategory().equals(taskCategory))
70+
.collect(Collectors.toMap(JdbcSelectTask::getTargetPath, JdbcSelectTask::getSql));
71+
72+
assertEquals(expectedFilesToTables.size(), existingFilesToTables.size());
73+
for (Entry<String, String> entry : expectedFilesToTables.entrySet()) {
74+
String fileName = entry.getKey();
75+
String tableName = entry.getValue();
76+
77+
String sql = existingFilesToTables.get(fileName);
78+
assertNotNull("Query for file " + fileName + " doesn't exist", sql);
79+
80+
assertTrue(
81+
"file: " + fileName + " should be selected from table: " + tableName,
82+
sql.toLowerCase().contains("from " + tableName.toLowerCase()));
83+
}
84+
}
85+
86+
@Test
87+
public void addTasksTo_containsRequiredTasks() throws Exception {
88+
List<Task<?>> tasks = new ArrayList<>();
89+
90+
// Act
91+
connector.addTasksTo(tasks, null);
92+
93+
// Assert
94+
long dumpMetadataCount = tasks.stream().filter(t -> t instanceof DumpMetadataTask).count();
95+
long formatCount = tasks.stream().filter(t -> t instanceof FormatTask).count();
96+
97+
assertEquals("One DumpMetadataTask is expected", dumpMetadataCount, 1);
98+
assertEquals("One FormatTask is expected", formatCount, 1);
99+
}
100+
}

0 commit comments

Comments
 (0)