Skip to content

Commit

Permalink
CDK module changes for destination (airbytehq#36588)
Browse files Browse the repository at this point in the history
### TL;DR
Updated JDBC and S3 destination to compile destination-redshift
  • Loading branch information
gisripa authored Mar 29, 2024
1 parent 7216e92 commit 219c194
Show file tree
Hide file tree
Showing 33 changed files with 141 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ object ConnectionFactory {
* @param jdbcConnectionString The JDBC connection string.
* @return The configured [Connection]
*/
@JvmStatic
fun create(
username: String?,
password: String?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
package io.airbyte.cdk.db.factory

/** Collection of JDBC driver class names and the associated JDBC URL format string. */
enum class DatabaseDriver(
@JvmField val driverClassName: String,
@JvmField val urlFormatString: String
) {
enum class DatabaseDriver(val driverClassName: String, val urlFormatString: String) {
CLICKHOUSE("com.clickhouse.jdbc.ClickHouseDriver", "jdbc:clickhouse:%s://%s:%d/%s"),
DATABRICKS(
"com.databricks.client.jdbc.Driver",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,20 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
}

@Throws(SQLException::class)
protected fun putTime(node: ObjectNode, columnName: String?, resultSet: ResultSet, index: Int) {
protected open fun putTime(
node: ObjectNode,
columnName: String?,
resultSet: ResultSet,
index: Int
) {
node.put(
columnName,
DateTimeConverter.convertToTime(getObject(resultSet, index, LocalTime::class.java))
)
}

@Throws(SQLException::class)
protected fun putTimestamp(
protected open fun putTimestamp(
node: ObjectNode,
columnName: String?,
resultSet: ResultSet,
Expand Down Expand Up @@ -419,7 +424,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
}

@Throws(SQLException::class)
protected fun putTimeWithTimezone(
protected open fun putTimeWithTimezone(
node: ObjectNode,
columnName: String?,
resultSet: ResultSet,
Expand All @@ -430,7 +435,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
}

@Throws(SQLException::class)
protected fun putTimestampWithTimezone(
protected open fun putTimestampWithTimezone(
node: ObjectNode,
columnName: String?,
resultSet: ResultSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory

/** Implementation of source operations with standard JDBC types. */
class JdbcSourceOperations :
open class JdbcSourceOperations :
AbstractJdbcCompatibleSourceOperations<JDBCType>(), SourceOperations<ResultSet, JDBCType> {
protected fun safeGetJdbcType(columnTypeInt: Int): JDBCType {
return try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ constructor(

const val TIMEOUT_MILLIS: Int = 15000 // 15 seconds

@JvmStatic
fun getInstance(config: JsonNode, hostKey: List<String>, portKey: List<String>): SshTunnel {
val tunnelMethod =
Jsons.getOptional(config, "tunnel_method", "tunnel_method")
Expand Down Expand Up @@ -515,6 +516,7 @@ constructor(
)
}

@JvmStatic
@Throws(Exception::class)
fun sshWrap(
config: JsonNode,
Expand All @@ -528,6 +530,7 @@ constructor(
}
}

@JvmStatic
@Throws(Exception::class)
fun sshWrap(
config: JsonNode,
Expand All @@ -540,6 +543,7 @@ constructor(
}
}

@JvmStatic
@Throws(Exception::class)
fun <T> sshWrap(
config: JsonNode,
Expand All @@ -552,6 +556,7 @@ constructor(
}
}

@JvmStatic
@Throws(Exception::class)
fun <T> sshWrap(
config: JsonNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory

abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationState>(
driverClass: String,
protected val namingResolver: NamingConventionTransformer,
protected open val namingResolver: NamingConventionTransformer,
protected val sqlOperations: SqlOperations
) : JdbcConnector(driverClass), Destination {
protected val configSchemaKey: String
Expand Down Expand Up @@ -106,14 +106,14 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
* @throws Exception
*/
@Throws(Exception::class)
protected fun destinationSpecificTableOperations(database: JdbcDatabase?) {}
protected open fun destinationSpecificTableOperations(database: JdbcDatabase?) {}

/**
* Subclasses which need to modify the DataSource should override [.modifyDataSourceBuilder]
* rather than this method.
*/
@VisibleForTesting
fun getDataSource(config: JsonNode): DataSource {
open fun getDataSource(config: JsonNode): DataSource {
val jdbcConfig = toJdbcConfig(config)
val connectionProperties = getConnectionProperties(config)
val builder =
Expand All @@ -137,7 +137,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
}

@VisibleForTesting
fun getDatabase(dataSource: DataSource): JdbcDatabase {
open fun getDatabase(dataSource: DataSource): JdbcDatabase {
return DefaultJdbcDatabase(dataSource)
}

Expand Down Expand Up @@ -195,7 +195,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
return config[JdbcUtils.DATABASE_KEY].asText()
}

protected fun getDataTransformer(
protected open fun getDataTransformer(
parsedCatalog: ParsedCatalog?,
defaultNamespace: String?
): StreamAwareDataTransformer {
Expand Down Expand Up @@ -343,6 +343,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
* - set true if need to make attempt to insert dummy records to newly created table. Set
* false to skip insert step.
*/
@JvmStatic
@Throws(Exception::class)
fun attemptTableOperations(
outputSchema: String?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ abstract class JdbcSqlOperations : SqlOperations {
return listOf()
}

protected fun createTableQueryV1(schemaName: String?, tableName: String?): String {
protected open fun createTableQueryV1(schemaName: String?, tableName: String?): String {
return String.format(
"""
CREATE TABLE IF NOT EXISTS %s.%s (
Expand All @@ -104,7 +104,7 @@ abstract class JdbcSqlOperations : SqlOperations {
)
}

protected fun createTableQueryV2(schemaName: String?, tableName: String?): String {
protected open fun createTableQueryV2(schemaName: String?, tableName: String?): String {
// Note that Meta is the last column in order, there was a time when tables didn't have
// meta,
// we issued Alter to add that column so it should be the last column.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ object SqlOperationsUtils {
* @param records records to write
* @throws SQLException exception
*/
@JvmStatic
@Throws(SQLException::class)
fun insertRawRecordsInSingleQuery(
insertQueryComponent: String?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory
* This class exists to make it easy to define a destination in terms of multiple other destination
* implementations, switching between them based on the config provided.
*/
class SwitchingDestination<T : Enum<T>>(
open class SwitchingDestination<T : Enum<T>>(
enumClass: Class<T>,
configToType: Function<JsonNode, T>,
typeToDestination: Map<T, Destination>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,24 +263,21 @@ abstract class JdbcDestinationHandler<DestinationState>(
// We can't directly call record.set here, because that will raise a
// ConcurrentModificationException on the fieldnames iterator.
// Instead, build up a map of new fields and set them all at once.
newFields.put(fieldName.lowercase(Locale.getDefault()), record[fieldName])
newFields[fieldName.lowercase(Locale.getDefault())] = record[fieldName]
}

record.setAll<JsonNode>(newFields)
}
.collect(
toMap(
{ record ->
val nameNode: JsonNode = record.get(DESTINATION_STATE_TABLE_COLUMN_NAME)
val namespaceNode: JsonNode =
record.get(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)
AirbyteStreamNameNamespacePair(
if (nameNode != null) nameNode.asText() else null,
if (namespaceNode != null) namespaceNode.asText() else null
record.get(DESTINATION_STATE_TABLE_COLUMN_NAME)?.asText(),
record.get(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)?.asText()
)
},
{ record ->
val stateNode: JsonNode =
val stateNode: JsonNode? =
record.get(DESTINATION_STATE_TABLE_COLUMN_STATE)
val state =
if (stateNode != null) Jsons.deserialize(stateNode.asText())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio
cursorField: Optional<ColumnId>
): Field<Int>

protected val dslContext: DSLContext
protected open val dslContext: DSLContext
get() = DSL.using(dialect)

/**
Expand Down Expand Up @@ -596,7 +596,7 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio
.getSQL(ParamType.INLINED)
}

protected fun castedField(
protected open fun castedField(
field: Field<*>?,
type: AirbyteType,
alias: String?,
Expand Down Expand Up @@ -625,7 +625,7 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio
return DSL.cast(field, toDialectType(type))
}

protected fun currentTimestamp(): Field<Timestamp> {
protected open fun currentTimestamp(): Field<Timestamp> {
return DSL.currentTimestamp()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ abstract class DestinationAcceptanceTest {
protected var localRoot: Path? = null
open protected var _testDataComparator: TestDataComparator = getTestDataComparator()

open fun getTestDataComparator(): TestDataComparator {
protected open fun getTestDataComparator(): TestDataComparator {
return BasicTestDataComparator { this.resolveIdentifier(it) }
}

Expand All @@ -102,7 +102,7 @@ abstract class DestinationAcceptanceTest {
*/
get

protected fun supportsInDestinationNormalization(): Boolean {
protected open fun supportsInDestinationNormalization(): Boolean {
return false
}

Expand Down Expand Up @@ -208,7 +208,7 @@ abstract class DestinationAcceptanceTest {
/**
* Override to return true if a destination implements namespaces and should be tested as such.
*/
protected fun implementsNamespaces(): Boolean {
protected open fun implementsNamespaces(): Boolean {
return false
}

Expand Down Expand Up @@ -308,7 +308,7 @@ abstract class DestinationAcceptanceTest {
* - can throw any exception, test framework will handle.
*/
@Throws(Exception::class)
protected fun retrieveNormalizedRecords(
protected open fun retrieveNormalizedRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?
Expand Down Expand Up @@ -1036,7 +1036,7 @@ abstract class DestinationAcceptanceTest {
}
}

protected val maxRecordValueLimit: Int
protected open val maxRecordValueLimit: Int
/** @return the max limit length allowed for values in the destination. */
get() = 1000000000

Expand Down Expand Up @@ -1953,7 +1953,7 @@ abstract class DestinationAcceptanceTest {
return false
}

protected fun supportIncrementalSchemaChanges(): Boolean {
protected open fun supportIncrementalSchemaChanges(): Boolean {
return false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ object TestingNamespaces {
*
* @return convention-compliant namespace
*/
@JvmStatic
@JvmOverloads
fun generate(prefix: String? = null): String {
val userDefinedPrefix = if (prefix != null) prefix + "_" else ""
Expand All @@ -58,6 +59,7 @@ object TestingNamespaces {
* @param namespace to check
* @return true if the namespace is older than 2 days, otherwise false
*/
@JvmStatic
fun isOlderThan2Days(namespace: String): Boolean {
return isOlderThan(namespace, 2, ChronoUnit.DAYS)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ open class AdvancedTestDataComparator : TestDataComparator {
}
}

protected fun resolveIdentifier(identifier: String?): List<String?> {
protected open fun resolveIdentifier(identifier: String?): List<String?> {
return java.util.List.of(identifier)
}

Expand Down Expand Up @@ -174,7 +174,7 @@ open class AdvancedTestDataComparator : TestDataComparator {
.withZoneSameInstant(ZoneOffset.UTC)
}

protected fun compareDateTimeWithTzValues(
protected open fun compareDateTimeWithTzValues(
airbyteMessageValue: String,
destinationValue: String
): Boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ abstract class JdbcTypingDedupingTest : BaseTypingDedupingTest() {

protected abstract fun getDataSource(config: JsonNode?): DataSource?

protected val sourceOperations: JdbcCompatibleSourceOperations<*>
protected open val sourceOperations: JdbcCompatibleSourceOperations<*>
/**
* Subclasses may need to return a custom source operations if the default one does not
* handle vendor-specific types correctly. For example, you most likely need to override
* this method to deserialize JSON columns to JsonNode.
*/
get() = JdbcUtils.defaultSourceOperations

protected val rawSchema: String
protected open val rawSchema: String
/**
* Subclasses using a config with a nonstandard raw table schema should override this
* method.
Expand Down
Loading

0 comments on commit 219c194

Please sign in to comment.