|
35 | 35 | import java.time.LocalDateTime;
|
36 | 36 | import java.time.LocalTime;
|
37 | 37 | import java.time.ZonedDateTime;
|
| 38 | +import java.time.format.DateTimeParseException; |
38 | 39 | import java.util.ArrayList;
|
39 | 40 | import java.util.List;
|
40 | 41 | import java.util.Map;
|
@@ -112,17 +113,34 @@ public StructuredRecord decodeRecord(Row row, Schema schema) throws RecordConver
|
112 | 113 | private Object decode(String name, Object object, Schema schema) throws RecordConvertorException {
|
113 | 114 | // Extract the type of the field.
|
114 | 115 | Schema.Type type = schema.getType();
|
115 |
| - Schema.LogicalType logicalType = schema.getLogicalType(); |
| 116 | + Schema.LogicalType logicalType = |
| 117 | + schema.isNullable() ? schema.getNonNullable().getLogicalType() : |
| 118 | + schema.getLogicalType(); |
116 | 119 |
|
117 | 120 | if (logicalType != null) {
|
118 | 121 | switch (logicalType) {
|
| 122 | + case DATETIME: |
| 123 | + if (schema.isNullable() && object == null || object instanceof LocalDateTime) { |
| 124 | + return object; |
| 125 | + } |
| 126 | + if (object == null) { |
| 127 | + throw new UnexpectedFormatException( |
| 128 | + String.format("Datetime field %s should have a non null value", name)); |
| 129 | + } |
| 130 | + try { |
| 131 | + LocalDateTime.parse((String) object); |
| 132 | + } catch (DateTimeParseException exception) { |
| 133 | + throw new UnexpectedFormatException( |
| 134 | + String.format("Datetime field '%s' with value '%s' is not in ISO-8601 format.", |
| 135 | + name, object), exception); |
| 136 | + } |
| 137 | + return object; |
119 | 138 | case DATE:
|
120 | 139 | case TIME_MILLIS:
|
121 | 140 | case TIME_MICROS:
|
122 | 141 | case TIMESTAMP_MILLIS:
|
123 | 142 | case TIMESTAMP_MICROS:
|
124 | 143 | case DECIMAL:
|
125 |
| - case DATETIME: |
126 | 144 | return object;
|
127 | 145 | default:
|
128 | 146 | throw new UnexpectedFormatException("field type " + logicalType + " is not supported.");
|
|
0 commit comments