-
Notifications
You must be signed in to change notification settings - Fork 98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1665420 add logic to parse Iceberg schema #996
SNOW-1665420 add logic to parse Iceberg schema #996
Conversation
|
||
private final Type schema; | ||
|
||
private final String columnName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to have a column field. I can't retrieve it from schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this comment. Could you elaborate on that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When channel.getTableSchema().get("COLUMN_NAME").getIcebergSchema()
in return we receive schema like in my Tree test. There is no info about column name.
Hence I will have to get column name from InsertError. I am going to pass both into this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I got it
import org.apache.kafka.connect.data.Date; | ||
import org.apache.kafka.connect.data.Decimal; | ||
import org.apache.kafka.connect.data.Schema; | ||
import org.apache.kafka.connect.data.Time; | ||
import org.apache.kafka.connect.data.Timestamp; | ||
|
||
public class IcebergColumnTypeMapper extends ColumnTypeMapper { | ||
|
||
/** | ||
* See <a href="https://docs.snowflake.com/en/user-guide/tables-iceberg-data-types">Data types for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return "TIME(6)"; | ||
case TIMESTAMP: | ||
Types.TimestampType timestamp = (Types.TimestampType) apacheIcebergType; | ||
return timestamp.shouldAdjustToUTC() ? "TIMESTAMP_LTZ" : "TIMESTAMP"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I didn't type precision of a timestamp like ex. TIMESTAMP_LTZ(6). I want server to resolve it. It's confusing because docs mentions that timestamp -> TIMESTAMP_LTZ/NTZ(6) but when I manually altered Iceberg table it creates a column with ...(9) precision. I must come back to it when It will start to work end 2 end.
* GlobalServices/modules/data-lake/datalake-api/src/main/java/com/snowflake/metadata/iceberg | ||
* /IcebergDataTypeParser.java | ||
*/ | ||
public class IcebergDataTypeParser { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By now I only use it in test. However It's gonna be needed later. It's copied from ingest-sdk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states that it is copied from the monorepo, but I guess it is the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So they copied it from GS and I copied from them, whatever. I don't remember.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we shouldn't include the path in the javadoc comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment removed.
@MethodSource("prepareData") | ||
@Disabled | ||
// Schema evolution for structured types is not yet supported | ||
void shouldEvolveSchemaAndInsertRecords_structuredData( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's going to evolve. I am gonna need it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, but please extract the common parts of both tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will just save it to my notes. Maybe I shouldn't have add it yet - it confuses reviewer and it's not yet needed. The code will evolve anyway.
2754d3c
to
a5870be
Compare
* See <a href="https://docs.snowflake.com/en/user-guide/tables-iceberg-data-types">Data types for | ||
* Apache Iceberg™ tables</a> | ||
*/ | ||
public static String mapToSnowflakeDataType(Type apacheIcebergType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like this static but I want it to stay as it is for now.
"TEST_COLUMN_NAME OBJECT(K1 NUMBER(10,0), K2 NUMBER(10,0), NESTED_OBJECT" | ||
+ " OBJECT(NESTED_KEY1 VARCHAR(16777216), NESTED_KEY2 VARCHAR(16777216)))"), | ||
arguments( | ||
"{\"type\":\"struct\",\"fields\":[{\"id\":2,\"name\":\"offset\",\"required\":false,\"type\":\"int\"},{\"id\":3,\"name\":\"topic\",\"required\":false,\"type\":\"string\"},{\"id\":4,\"name\":\"partition\",\"required\":false,\"type\":\"int\"},{\"id\":5,\"name\":\"key\",\"required\":false,\"type\":\"string\"},{\"id\":6,\"name\":\"schema_id\",\"required\":false,\"type\":\"int\"},{\"id\":7,\"name\":\"key_schema_id\",\"required\":false,\"type\":\"int\"},{\"id\":8,\"name\":\"CreateTime\",\"required\":false,\"type\":\"long\"},{\"id\":9,\"name\":\"LogAppendTime\",\"required\":false,\"type\":\"long\"},{\"id\":10,\"name\":\"SnowflakeConnectorPushTime\",\"required\":false,\"type\":\"long\"},{\"id\":11,\"name\":\"headers\",\"required\":false,\"type\":{\"type\":\"map\",\"key-id\":12,\"key\":\"string\",\"value-id\":13,\"value\":\"string\",\"value-required\":false}}]}\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't made me format it manually...
a5870be
to
61b0e74
Compare
pom.xml
Outdated
@@ -56,6 +56,7 @@ | |||
<confluent.version>7.7.0</confluent.version> | |||
<!--Compatible protobuf version https://github.com/confluentinc/common/blob/v7.7.0/pom.xml#L91 --> | |||
<protobuf.version>3.25.5</protobuf.version> | |||
<iceberg.version>1.5.2</iceberg.version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's 1.6.1 in the ingest-sdk. Let's try to align.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
pom.xml
Outdated
@@ -338,7 +339,7 @@ | |||
<dependency> | |||
<groupId>net.snowflake</groupId> | |||
<artifactId>snowflake-ingest-sdk</artifactId> | |||
<version>2.3.0</version> | |||
<version>3.0.0</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please align pom.xml
with pom_confluent.xml
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sry, I forgot about pom_confluent.xml
, however I must revert back to 2.3.0. , 3.0.0 is not yet available.
public final LinkedHashMap<String, IcebergFieldNode> children; | ||
|
||
public IcebergFieldNode(String name, Type apacheIcebergSchema) { | ||
this.name = name.toUpperCase(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about this toUpperCase()
. At least fields inside the nested structures are case sensitive.
Even though we can wait for testing details like that once we set everything e2e.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least fields inside the nested structures are case sensitive.
So then It won't work. I'll remove this toUpperCase
method.
|
||
public final LinkedHashMap<String, IcebergFieldNode> children; | ||
|
||
public IcebergFieldNode(String name, Type apacheIcebergSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's pretty heavy for the constructor. Perhaps we could move this logic to some kind of Factory class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really feel moving this logic somewhere else. It's easy to construct nodes inside this class.
Assertions.assertEquals(expectedQuery, tree.buildQuery()); | ||
} | ||
|
||
static Stream<Arguments> icebergSchemas() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job!
059939c
to
197770f
Compare
Overview
SNOW-1665420
The goal of this PR is to implement logic to parse plain iceberg schema. That schema will be retrieved from a channel during schema evolution.
I generate IcebergColumnTree from Iceberg schema.
I added logic to generate part of the query that will be used to alter the column. Generating query is out of scope but It's the best way to test the logic. I didn't handle nullability of the columns yet.
Pre-review checklist
snowflake.streaming.iceberg.enabled
Yes
- Added end to end and Unit Tests.No
- Suggest why it is not param protected