Skip to content

Commit 5a21321

Browse files
kaushiksriniFokko
andauthored
Add unknown type (#1681)
Support for new type `Unknown`, part of Iceberg V3 spec. Closes of #1553. --------- Co-authored-by: Fokko Driesprong <[email protected]>
1 parent a275ce5 commit 5a21321

17 files changed

+174
-33
lines changed

pyiceberg/avro/encoder.py

+4
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
from typing import Any
1718
from uuid import UUID
1819

1920
from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT
@@ -74,3 +75,6 @@ def write_uuid(self, uuid: UUID) -> None:
7475
if len(uuid.bytes) != 16:
7576
raise ValueError(f"Expected UUID to have 16 bytes, got: len({uuid.bytes!r})")
7677
return self.write(uuid.bytes)
78+
79+
def write_unknown(self, _: Any) -> None:
80+
"""Nulls are written as 0 bytes in avro, so we do nothing."""

pyiceberg/avro/reader.py

+8
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,14 @@ def skip(self, decoder: BinaryDecoder) -> None:
201201
decoder.skip(16)
202202

203203

204+
class UnknownReader(Reader):
205+
def read(self, decoder: BinaryDecoder) -> None:
206+
return None
207+
208+
def skip(self, decoder: BinaryDecoder) -> None:
209+
pass
210+
211+
204212
@dataclass(frozen=True)
205213
class FixedReader(Reader):
206214
_len: int = dataclassfield()

pyiceberg/avro/resolver.py

+12
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
TimeReader,
4747
TimestampReader,
4848
TimestamptzReader,
49+
UnknownReader,
4950
UUIDReader,
5051
)
5152
from pyiceberg.avro.writer import (
@@ -66,6 +67,7 @@
6667
TimestamptzWriter,
6768
TimestampWriter,
6869
TimeWriter,
70+
UnknownWriter,
6971
UUIDWriter,
7072
Writer,
7173
)
@@ -100,6 +102,7 @@
100102
TimestampType,
101103
TimestamptzType,
102104
TimeType,
105+
UnknownType,
103106
UUIDType,
104107
)
105108

@@ -193,6 +196,9 @@ def visit_uuid(self, uuid_type: UUIDType) -> Writer:
193196
def visit_binary(self, binary_type: BinaryType) -> Writer:
194197
return BinaryWriter()
195198

199+
def visit_unknown(self, unknown_type: UnknownType) -> Writer:
200+
return UnknownWriter()
201+
196202

197203
CONSTRUCT_WRITER_VISITOR = ConstructWriter()
198204

@@ -341,6 +347,9 @@ def visit_fixed(self, fixed_type: FixedType, partner: Optional[IcebergType]) ->
341347
def visit_binary(self, binary_type: BinaryType, partner: Optional[IcebergType]) -> Writer:
342348
return BinaryWriter()
343349

350+
def visit_unknown(self, unknown_type: UnknownType, partner: Optional[IcebergType]) -> Writer:
351+
return UnknownWriter()
352+
344353

345354
class ReadSchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
346355
__slots__ = ("read_types", "read_enums", "context")
@@ -471,6 +480,9 @@ def visit_fixed(self, fixed_type: FixedType, partner: Optional[IcebergType]) ->
471480
def visit_binary(self, binary_type: BinaryType, partner: Optional[IcebergType]) -> Reader:
472481
return BinaryReader()
473482

483+
def visit_unknown(self, unknown_type: UnknownType, partner: Optional[IcebergType]) -> Reader:
484+
return UnknownReader()
485+
474486

475487
class SchemaPartnerAccessor(PartnerAccessor[IcebergType]):
476488
def schema_partner(self, partner: Optional[IcebergType]) -> Optional[IcebergType]:

pyiceberg/avro/writer.py

+6
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ def write(self, encoder: BinaryEncoder, val: UUID) -> None:
113113
encoder.write(val.bytes)
114114

115115

116+
@dataclass(frozen=True)
117+
class UnknownWriter(Writer):
118+
def write(self, encoder: BinaryEncoder, val: Any) -> None:
119+
encoder.write_unknown(val)
120+
121+
116122
@dataclass(frozen=True)
117123
class FixedWriter(Writer):
118124
_len: int = dataclassfield()

pyiceberg/catalog/hive.py

+2
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
TimestampType,
111111
TimestamptzType,
112112
TimeType,
113+
UnknownType,
113114
UUIDType,
114115
)
115116
from pyiceberg.utils.properties import property_as_bool, property_as_float
@@ -236,6 +237,7 @@ def _annotate_namespace(database: HiveDatabase, properties: Properties) -> HiveD
236237
UUIDType: "string",
237238
BinaryType: "binary",
238239
FixedType: "binary",
240+
UnknownType: "void",
239241
}
240242

241243

pyiceberg/conversions.py

+12
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
TimestampType,
5757
TimestamptzType,
5858
TimeType,
59+
UnknownType,
5960
UUIDType,
6061
strtobool,
6162
)
@@ -154,6 +155,12 @@ def _(_: DecimalType, value_str: str) -> Decimal:
154155
return Decimal(value_str)
155156

156157

158+
@partition_to_py.register(UnknownType)
159+
@handle_none
160+
def _(type_: UnknownType, _: str) -> None:
161+
return None
162+
163+
157164
@singledispatch
158165
def to_bytes(
159166
primitive_type: PrimitiveType, _: Union[bool, bytes, Decimal, date, datetime, float, int, str, time, uuid.UUID]
@@ -324,3 +331,8 @@ def _(_: PrimitiveType, b: bytes) -> bytes:
324331
def _(primitive_type: DecimalType, buf: bytes) -> Decimal:
325332
unscaled = int.from_bytes(buf, "big", signed=True)
326333
return unscaled_to_decimal(unscaled, primitive_type.scale)
334+
335+
336+
@from_bytes.register(UnknownType)
337+
def _(type_: UnknownType, buf: bytes) -> None:
338+
return None

pyiceberg/io/pyarrow.py

+9
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@
166166
TimestampType,
167167
TimestamptzType,
168168
TimeType,
169+
UnknownType,
169170
UUIDType,
170171
)
171172
from pyiceberg.utils.concurrent import ExecutorFactory
@@ -670,6 +671,9 @@ def visit_string(self, _: StringType) -> pa.DataType:
670671
def visit_uuid(self, _: UUIDType) -> pa.DataType:
671672
return pa.binary(16)
672673

674+
def visit_unknown(self, _: UnknownType) -> pa.DataType:
675+
return pa.null()
676+
673677
def visit_binary(self, _: BinaryType) -> pa.DataType:
674678
return pa.large_binary()
675679

@@ -1220,6 +1224,8 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
12201224
elif pa.types.is_fixed_size_binary(primitive):
12211225
primitive = cast(pa.FixedSizeBinaryType, primitive)
12221226
return FixedType(primitive.byte_width)
1227+
elif pa.types.is_null(primitive):
1228+
return UnknownType()
12231229

12241230
raise TypeError(f"Unsupported type: {primitive}")
12251231

@@ -1900,6 +1906,9 @@ def visit_uuid(self, uuid_type: UUIDType) -> str:
19001906
def visit_binary(self, binary_type: BinaryType) -> str:
19011907
return "BYTE_ARRAY"
19021908

1909+
def visit_unknown(self, unknown_type: UnknownType) -> str:
1910+
return "UNKNOWN"
1911+
19031912

19041913
_PRIMITIVE_TO_PHYSICAL_TYPE_VISITOR = PrimitiveToPhysicalType()
19051914

pyiceberg/partitioning.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ def _(type: IcebergType, value: Optional[Union[int, datetime]]) -> Optional[int]
444444
elif isinstance(value, datetime):
445445
return datetime_to_micros(value)
446446
else:
447-
raise ValueError(f"Unknown type: {value}")
447+
raise ValueError(f"Type not recognized: {value}")
448448

449449

450450
@_to_partition_representation.register(DateType)
@@ -456,7 +456,7 @@ def _(type: IcebergType, value: Optional[Union[int, date]]) -> Optional[int]:
456456
elif isinstance(value, date):
457457
return date_to_days(value)
458458
else:
459-
raise ValueError(f"Unknown type: {value}")
459+
raise ValueError(f"Type not recognized: {value}")
460460

461461

462462
@_to_partition_representation.register(TimeType)

pyiceberg/schema.py

+15-2
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
TimestampType,
6161
TimestamptzType,
6262
TimeType,
63+
UnknownType,
6364
UUIDType,
6465
)
6566

@@ -531,8 +532,10 @@ def primitive(self, primitive: PrimitiveType, primitive_partner: Optional[P]) ->
531532
return self.visit_fixed(primitive, primitive_partner)
532533
elif isinstance(primitive, BinaryType):
533534
return self.visit_binary(primitive, primitive_partner)
535+
elif isinstance(primitive, UnknownType):
536+
return self.visit_unknown(primitive, primitive_partner)
534537
else:
535-
raise ValueError(f"Unknown type: {primitive}")
538+
raise ValueError(f"Type not recognized: {primitive}")
536539

537540
@abstractmethod
538541
def visit_boolean(self, boolean_type: BooleanType, partner: Optional[P]) -> T:
@@ -590,6 +593,10 @@ def visit_fixed(self, fixed_type: FixedType, partner: Optional[P]) -> T:
590593
def visit_binary(self, binary_type: BinaryType, partner: Optional[P]) -> T:
591594
"""Visit a BinaryType."""
592595

596+
@abstractmethod
597+
def visit_unknown(self, unknown_type: UnknownType, partner: Optional[P]) -> T:
598+
"""Visit a UnknownType."""
599+
593600

594601
class PartnerAccessor(Generic[P], ABC):
595602
@abstractmethod
@@ -707,8 +714,10 @@ def primitive(self, primitive: PrimitiveType) -> T:
707714
return self.visit_uuid(primitive)
708715
elif isinstance(primitive, BinaryType):
709716
return self.visit_binary(primitive)
717+
elif isinstance(primitive, UnknownType):
718+
return self.visit_unknown(primitive)
710719
else:
711-
raise ValueError(f"Unknown type: {primitive}")
720+
raise ValueError(f"Type not recognized: {primitive}")
712721

713722
@abstractmethod
714723
def visit_fixed(self, fixed_type: FixedType) -> T:
@@ -766,6 +775,10 @@ def visit_uuid(self, uuid_type: UUIDType) -> T:
766775
def visit_binary(self, binary_type: BinaryType) -> T:
767776
"""Visit a BinaryType."""
768777

778+
@abstractmethod
779+
def visit_unknown(self, unknown_type: UnknownType) -> T:
780+
"""Visit a UnknownType."""
781+
769782

770783
@dataclass(init=True, eq=True, frozen=True)
771784
class Accessor:

pyiceberg/types.py

+19-1
Original file line numberDiff line numberDiff line change
@@ -148,13 +148,15 @@ def handle_primitive_type(cls, v: Any, handler: ValidatorFunctionWrapHandler) ->
148148
return UUIDType()
149149
if v == "binary":
150150
return BinaryType()
151+
if v == "unknown":
152+
return UnknownType()
151153
if v.startswith("fixed"):
152154
return FixedType(_parse_fixed_type(v))
153155
if v.startswith("decimal"):
154156
precision, scale = _parse_decimal_type(v)
155157
return DecimalType(precision, scale)
156158
else:
157-
raise ValueError(f"Unknown type: {v}")
159+
raise ValueError(f"Type not recognized: {v}")
158160
if isinstance(v, dict) and cls == IcebergType:
159161
complex_type = v.get("type")
160162
if complex_type == "list":
@@ -747,3 +749,19 @@ class BinaryType(PrimitiveType):
747749
"""
748750

749751
root: Literal["binary"] = Field(default="binary")
752+
753+
754+
class UnknownType(PrimitiveType):
755+
"""An unknown data type in Iceberg can be represented using an instance of this class.
756+
757+
Unknowns in Iceberg are used to represent data types that are not known at the time of writing.
758+
759+
Example:
760+
>>> column_foo = UnknownType()
761+
>>> isinstance(column_foo, UnknownType)
762+
True
763+
>>> column_foo
764+
UnknownType()
765+
"""
766+
767+
root: Literal["unknown"] = Field(default="unknown")

pyiceberg/utils/schema_conversion.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
TimestampType,
4848
TimestamptzType,
4949
TimeType,
50+
UnknownType,
5051
UUIDType,
5152
)
5253
from pyiceberg.utils.decimal import decimal_required_bytes
@@ -62,6 +63,7 @@
6263
"long": LongType(),
6364
"string": StringType(),
6465
"enum": StringType(),
66+
"null": UnknownType(),
6567
}
6668

6769
LOGICAL_FIELD_TYPE_MAPPING: Dict[Tuple[str, str], PrimitiveType] = {
@@ -209,9 +211,9 @@ def _convert_schema(self, avro_type: Union[str, Dict[str, Any]]) -> IcebergType:
209211
elif isinstance(type_identifier, str) and type_identifier in PRIMITIVE_FIELD_TYPE_MAPPING:
210212
return PRIMITIVE_FIELD_TYPE_MAPPING[type_identifier]
211213
else:
212-
raise TypeError(f"Unknown type: {avro_type}")
214+
raise TypeError(f"Type not recognized: {avro_type}")
213215
else:
214-
raise TypeError(f"Unknown type: {avro_type}")
216+
raise TypeError(f"Type not recognized: {avro_type}")
215217

216218
def _convert_field(self, field: Dict[str, Any]) -> NestedField:
217219
"""Convert an Avro field into an Iceberg equivalent field.
@@ -618,3 +620,6 @@ def visit_uuid(self, uuid_type: UUIDType) -> AvroType:
618620

619621
def visit_binary(self, binary_type: BinaryType) -> AvroType:
620622
return "bytes"
623+
624+
def visit_unknown(self, unknown_type: UnknownType) -> AvroType:
625+
return "null"

tests/avro/test_reader.py

+3-8
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
TimeReader,
3838
TimestampReader,
3939
TimestamptzReader,
40+
UnknownReader,
4041
UUIDReader,
4142
)
4243
from pyiceberg.avro.resolver import construct_reader
@@ -55,12 +56,12 @@
5556
IntegerType,
5657
LongType,
5758
NestedField,
58-
PrimitiveType,
5959
StringType,
6060
StructType,
6161
TimestampType,
6262
TimestamptzType,
6363
TimeType,
64+
UnknownType,
6465
UUIDType,
6566
)
6667

@@ -325,13 +326,7 @@ def test_binary_reader() -> None:
325326

326327

327328
def test_unknown_type() -> None:
328-
class UnknownType(PrimitiveType):
329-
root: str = "UnknownType"
330-
331-
with pytest.raises(ValueError) as exc_info:
332-
construct_reader(UnknownType())
333-
334-
assert "Unknown type:" in str(exc_info.value)
329+
assert construct_reader(UnknownType()) == UnknownReader()
335330

336331

337332
def test_uuid_reader() -> None:

0 commit comments

Comments
 (0)