Skip to content

Commit 973e6fc

Browse files
authored
Add ArrowError::AvroError, remaining types and roundtrip tests to arrow-avro, (#8595)
# Which issue does this PR close? - Closes #4886 - Stacked on #8584 # Rationale for this change This PR brings Arrow-Avro round‑trip coverage up to date with modern Arrow types and the latest Avro logical types. In particular, Avro 1.12 adds `timestamp-nanos` and `local-timestamp-nanos`. Enabling these logical types and filling in missing Avro writer encoders for Arrow’s newer *view* and list families allows lossless read/write and simpler pipelines. It also hardens timestamp/time scaling in the writer to avoid silent overflow when converting seconds to milliseconds, surfacing a clear error instead. # What changes are included in this PR? * **Nanosecond timestamps**: Introduces a `TimestampNanos(bool)` codec in `arrow-avro` that maps Avro `timestamp-nanos` / `local-timestamp-nanos` to Arrow `Timestamp(Nanosecond, tz)`. The reader/decoder, union field kinds, and Arrow `DataType` mapping are all extended accordingly. Logical type detection is wired through both `logicalType` and the `arrowTimeUnit="nanosecond"` attribute. * **UUID logical type round‑trip fix**: When reading Avro `logicalType="uuid"` fields, preserve that logical type in Arrow field metadata so writers can round‑trip it back to Avro. * **Avro writer encoders**: Add the missing array encoders and coverage for Arrow’s `ListView`, `LargeListView`, and `FixedSizeList`, and extend array encoder support to `BinaryView` and `Utf8View`. (See large additions in `writer/encoder.rs`.) * **Safer time/timestamp scaling**: Guard second to millisecond conversions in `Time32`/`Timestamp` encoders to prevent overflow; encoding now returns a clear `InvalidArgument` error in those cases. * **Schema utilities**: Add `AvroSchemaOptions` with `null_order` and `strip_metadata` flags so Avro JSON can be built while optionally omitting internal Arrow keys during round‑trip schema generation. * **Tests & round‑trip coverage**: Add unit tests for nanosecond timestamp decoding (UTC, local, and with nulls) and additional end‑to‑end/round‑trip tests for the updated writer paths. # Are these changes tested? Yes. * New decoder tests validate `Timestamp(Nanosecond, tz)` behavior for UTC and local timestamps and for nullable unions. * Writer tests validate the nanosecond encoder and exercise an overflow path for second→millisecond conversion that now returns an error. * Additional round‑trip tests were added alongside the new encoders. # Are there any user-facing changes? N/A since `arrow-avro` is not public yet.
1 parent 161adba commit 973e6fc

File tree

8 files changed

+1234
-86
lines changed

8 files changed

+1234
-86
lines changed

arrow-avro/src/codec.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,8 @@ impl AvroDataType {
349349
Codec::Int64
350350
| Codec::TimeMicros
351351
| Codec::TimestampMillis(_)
352-
| Codec::TimestampMicros(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
352+
| Codec::TimestampMicros(_)
353+
| Codec::TimestampNanos(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
353354
#[cfg(feature = "avro_custom_types")]
354355
Codec::DurationNanos
355356
| Codec::DurationMicros
@@ -652,6 +653,11 @@ pub(crate) enum Codec {
652653
/// Maps to Arrow's Timestamp(TimeUnit::Microsecond) data type
653654
/// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
654655
TimestampMicros(bool),
656+
/// Represents Avro timestamp-nanos or local-timestamp-nanos logical type
657+
///
658+
/// Maps to Arrow's Timestamp(TimeUnit::Nanosecond) data type
659+
/// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
660+
TimestampNanos(bool),
655661
/// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
656662
/// The i32 parameter indicates the fixed binary size
657663
Fixed(i32),
@@ -715,6 +721,9 @@ impl Codec {
715721
Self::TimestampMicros(is_utc) => {
716722
DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into()))
717723
}
724+
Self::TimestampNanos(is_utc) => {
725+
DataType::Timestamp(TimeUnit::Nanosecond, is_utc.then(|| "+00:00".into()))
726+
}
718727
Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
719728
Self::Fixed(size) => DataType::FixedSizeBinary(*size),
720729
Self::Decimal(precision, scale, _size) => {
@@ -917,6 +926,8 @@ enum UnionFieldKind {
917926
TimestampMillisLocal,
918927
TimestampMicrosUtc,
919928
TimestampMicrosLocal,
929+
TimestampNanosUtc,
930+
TimestampNanosLocal,
920931
Duration,
921932
Fixed,
922933
Decimal,
@@ -946,6 +957,8 @@ impl From<&Codec> for UnionFieldKind {
946957
Codec::TimestampMillis(false) => Self::TimestampMillisLocal,
947958
Codec::TimestampMicros(true) => Self::TimestampMicrosUtc,
948959
Codec::TimestampMicros(false) => Self::TimestampMicrosLocal,
960+
Codec::TimestampNanos(true) => Self::TimestampNanosUtc,
961+
Codec::TimestampNanos(false) => Self::TimestampNanosLocal,
949962
Codec::Interval => Self::Duration,
950963
Codec::Fixed(_) => Self::Fixed,
951964
Codec::Decimal(..) => Self::Decimal,
@@ -1399,7 +1412,17 @@ impl<'a> Maker<'a> {
13991412
(Some("local-timestamp-micros"), c @ Codec::Int64) => {
14001413
*c = Codec::TimestampMicros(false)
14011414
}
1402-
(Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid,
1415+
(Some("timestamp-nanos"), c @ Codec::Int64) => *c = Codec::TimestampNanos(true),
1416+
(Some("local-timestamp-nanos"), c @ Codec::Int64) => {
1417+
*c = Codec::TimestampNanos(false)
1418+
}
1419+
(Some("uuid"), c @ Codec::Utf8) => {
1420+
// Map Avro string+logicalType=uuid into the UUID Codec,
1421+
// and preserve the logicalType in Arrow field metadata
1422+
// so writers can round-trip it correctly.
1423+
*c = Codec::Uuid;
1424+
field.metadata.insert("logicalType".into(), "uuid".into());
1425+
}
14031426
#[cfg(feature = "avro_custom_types")]
14041427
(Some("arrow.duration-nanos"), c @ Codec::Int64) => *c = Codec::DurationNanos,
14051428
#[cfg(feature = "avro_custom_types")]
@@ -1437,6 +1460,18 @@ impl<'a> Maker<'a> {
14371460
}
14381461
(None, _) => {}
14391462
}
1463+
if matches!(field.codec, Codec::Int64) {
1464+
if let Some(unit) = t
1465+
.attributes
1466+
.additional
1467+
.get("arrowTimeUnit")
1468+
.and_then(|v| v.as_str())
1469+
{
1470+
if unit == "nanosecond" {
1471+
field.codec = Codec::TimestampNanos(false);
1472+
}
1473+
}
1474+
}
14401475
if !t.attributes.additional.is_empty() {
14411476
for (k, v) in &t.attributes.additional {
14421477
field.metadata.insert(k.to_string(), v.to_string());

arrow-avro/src/reader/mod.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7437,7 +7437,6 @@ mod test {
74377437
"entire RecordBatch mismatch (schema, all columns, all rows)"
74387438
);
74397439
}
7440-
74417440
#[test]
74427441
fn comprehensive_e2e_resolution_test() {
74437442
use serde_json::Value;
@@ -7593,14 +7592,20 @@ mod test {
75937592
let batch = read_alltypes_with_reader_schema(path, reader_schema.clone());
75947593

75957594
const UUID_EXT_KEY: &str = "ARROW:extension:name";
7595+
const UUID_LOGICAL_KEY: &str = "logicalType";
75967596

75977597
let uuid_md_top: Option<HashMap<String, String>> = batch
75987598
.schema()
75997599
.field_with_name("uuid_str")
76007600
.ok()
76017601
.and_then(|f| {
76027602
let md = f.metadata();
7603-
if md.get(UUID_EXT_KEY).is_some() {
7603+
let has_ext = md.get(UUID_EXT_KEY).is_some();
7604+
let is_uuid_logical = md
7605+
.get(UUID_LOGICAL_KEY)
7606+
.map(|v| v.trim_matches('"') == "uuid")
7607+
.unwrap_or(false);
7608+
if has_ext || is_uuid_logical {
76047609
Some(md.clone())
76057610
} else {
76067611
None
@@ -7617,7 +7622,12 @@ mod test {
76177622
.find(|(_, child)| child.name() == "uuid")
76187623
.and_then(|(_, child)| {
76197624
let md = child.metadata();
7620-
if md.get(UUID_EXT_KEY).is_some() {
7625+
let has_ext = md.get(UUID_EXT_KEY).is_some();
7626+
let is_uuid_logical = md
7627+
.get(UUID_LOGICAL_KEY)
7628+
.map(|v| v.trim_matches('"') == "uuid")
7629+
.unwrap_or(false);
7630+
if has_ext || is_uuid_logical {
76217631
Some(md.clone())
76227632
} else {
76237633
None

arrow-avro/src/reader/record.rs

Lines changed: 110 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ enum Decoder {
206206
TimeMicros(Vec<i64>),
207207
TimestampMillis(bool, Vec<i64>),
208208
TimestampMicros(bool, Vec<i64>),
209+
TimestampNanos(bool, Vec<i64>),
209210
Int32ToInt64(Vec<i64>),
210211
Int32ToFloat32(Vec<f32>),
211212
Int32ToFloat64(Vec<f64>),
@@ -324,6 +325,9 @@ impl Decoder {
324325
(Codec::TimestampMicros(is_utc), _) => {
325326
Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
326327
}
328+
(Codec::TimestampNanos(is_utc), _) => {
329+
Self::TimestampNanos(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
330+
}
327331
#[cfg(feature = "avro_custom_types")]
328332
(Codec::DurationNanos, _) => {
329333
Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
@@ -530,7 +534,8 @@ impl Decoder {
530534
| Self::Int32ToInt64(v)
531535
| Self::TimeMicros(v)
532536
| Self::TimestampMillis(_, v)
533-
| Self::TimestampMicros(_, v) => v.push(0),
537+
| Self::TimestampMicros(_, v)
538+
| Self::TimestampNanos(_, v) => v.push(0),
534539
#[cfg(feature = "avro_custom_types")]
535540
Self::DurationSecond(v)
536541
| Self::DurationMillisecond(v)
@@ -643,7 +648,8 @@ impl Decoder {
643648
| Self::Int32ToInt64(v)
644649
| Self::TimeMicros(v)
645650
| Self::TimestampMillis(_, v)
646-
| Self::TimestampMicros(_, v) => match lit {
651+
| Self::TimestampMicros(_, v)
652+
| Self::TimestampNanos(_, v) => match lit {
647653
AvroLiteral::Long(i) => {
648654
v.push(*i);
649655
Ok(())
@@ -854,7 +860,8 @@ impl Decoder {
854860
Self::Int64(values)
855861
| Self::TimeMicros(values)
856862
| Self::TimestampMillis(_, values)
857-
| Self::TimestampMicros(_, values) => values.push(buf.get_long()?),
863+
| Self::TimestampMicros(_, values)
864+
| Self::TimestampNanos(_, values) => values.push(buf.get_long()?),
858865
#[cfg(feature = "avro_custom_types")]
859866
Self::DurationSecond(values)
860867
| Self::DurationMillisecond(values)
@@ -1070,6 +1077,10 @@ impl Decoder {
10701077
flush_primitive::<TimestampMicrosecondType>(values, nulls)
10711078
.with_timezone_opt(is_utc.then(|| "+00:00")),
10721079
),
1080+
Self::TimestampNanos(is_utc, values) => Arc::new(
1081+
flush_primitive::<TimestampNanosecondType>(values, nulls)
1082+
.with_timezone_opt(is_utc.then(|| "+00:00")),
1083+
),
10731084
#[cfg(feature = "avro_custom_types")]
10741085
Self::DurationSecond(values) => {
10751086
Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
@@ -1959,6 +1970,7 @@ enum Skipper {
19591970
TimeMicros,
19601971
TimestampMillis,
19611972
TimestampMicros,
1973+
TimestampNanos,
19621974
Fixed(usize),
19631975
Decimal(Option<usize>),
19641976
UuidString,
@@ -1983,6 +1995,7 @@ impl Skipper {
19831995
Codec::TimeMicros => Self::TimeMicros,
19841996
Codec::TimestampMillis(_) => Self::TimestampMillis,
19851997
Codec::TimestampMicros(_) => Self::TimestampMicros,
1998+
Codec::TimestampNanos(_) => Self::TimestampNanos,
19861999
#[cfg(feature = "avro_custom_types")]
19872000
Codec::DurationNanos
19882001
| Codec::DurationMicros
@@ -2044,7 +2057,11 @@ impl Skipper {
20442057
buf.get_int()?;
20452058
Ok(())
20462059
}
2047-
Self::Int64 | Self::TimeMicros | Self::TimestampMillis | Self::TimestampMicros => {
2060+
Self::Int64
2061+
| Self::TimeMicros
2062+
| Self::TimestampMillis
2063+
| Self::TimestampMicros
2064+
| Self::TimestampNanos => {
20482065
buf.get_long()?;
20492066
Ok(())
20502067
}
@@ -4647,4 +4664,93 @@ mod tests {
46474664
.expect("Int32Array");
46484665
assert_eq!(a.values(), &[1, 2, 3]);
46494666
}
4667+
4668+
#[test]
4669+
fn test_timestamp_nanos_decoding_utc() {
4670+
let avro_type = avro_from_codec(Codec::TimestampNanos(true));
4671+
let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
4672+
let mut data = Vec::new();
4673+
for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
4674+
data.extend_from_slice(&encode_avro_long(v));
4675+
}
4676+
let mut cur = AvroCursor::new(&data);
4677+
for _ in 0..4 {
4678+
decoder.decode(&mut cur).expect("decode nanos ts");
4679+
}
4680+
let array = decoder.flush(None).expect("flush nanos ts");
4681+
let ts = array
4682+
.as_any()
4683+
.downcast_ref::<TimestampNanosecondArray>()
4684+
.expect("TimestampNanosecondArray");
4685+
assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
4686+
match ts.data_type() {
4687+
DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
4688+
assert_eq!(tz.as_deref(), Some("+00:00"));
4689+
}
4690+
other => panic!("expected Timestamp(Nanosecond, Some(\"+00:00\")), got {other:?}"),
4691+
}
4692+
}
4693+
4694+
#[test]
4695+
fn test_timestamp_nanos_decoding_local() {
4696+
let avro_type = avro_from_codec(Codec::TimestampNanos(false));
4697+
let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
4698+
let mut data = Vec::new();
4699+
for v in [10_i64, 20_i64, -30_i64] {
4700+
data.extend_from_slice(&encode_avro_long(v));
4701+
}
4702+
let mut cur = AvroCursor::new(&data);
4703+
for _ in 0..3 {
4704+
decoder.decode(&mut cur).expect("decode nanos ts");
4705+
}
4706+
let array = decoder.flush(None).expect("flush nanos ts");
4707+
let ts = array
4708+
.as_any()
4709+
.downcast_ref::<TimestampNanosecondArray>()
4710+
.expect("TimestampNanosecondArray");
4711+
assert_eq!(ts.values(), &[10, 20, -30]);
4712+
match ts.data_type() {
4713+
DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
4714+
assert_eq!(tz.as_deref(), None);
4715+
}
4716+
other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
4717+
}
4718+
}
4719+
4720+
#[test]
4721+
fn test_timestamp_nanos_decoding_with_nulls() {
4722+
let avro_type = AvroDataType::new(
4723+
Codec::TimestampNanos(false),
4724+
Default::default(),
4725+
Some(Nullability::NullFirst),
4726+
);
4727+
let mut decoder = Decoder::try_new(&avro_type).expect("create nullable TimestampNanos");
4728+
let mut data = Vec::new();
4729+
data.extend_from_slice(&encode_avro_long(1));
4730+
data.extend_from_slice(&encode_avro_long(42));
4731+
data.extend_from_slice(&encode_avro_long(0));
4732+
data.extend_from_slice(&encode_avro_long(1));
4733+
data.extend_from_slice(&encode_avro_long(-7));
4734+
let mut cur = AvroCursor::new(&data);
4735+
for _ in 0..3 {
4736+
decoder.decode(&mut cur).expect("decode nullable nanos ts");
4737+
}
4738+
let array = decoder.flush(None).expect("flush nullable nanos ts");
4739+
let ts = array
4740+
.as_any()
4741+
.downcast_ref::<TimestampNanosecondArray>()
4742+
.expect("TimestampNanosecondArray");
4743+
assert_eq!(ts.len(), 3);
4744+
assert!(ts.is_valid(0));
4745+
assert!(ts.is_null(1));
4746+
assert!(ts.is_valid(2));
4747+
assert_eq!(ts.value(0), 42);
4748+
assert_eq!(ts.value(2), -7);
4749+
match ts.data_type() {
4750+
DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
4751+
assert_eq!(tz.as_deref(), None);
4752+
}
4753+
other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
4754+
}
4755+
}
46504756
}

0 commit comments

Comments
 (0)