Skip to content

Commit 381c766

Browse files
pamitmpamitm-uber
andauthored
Compatible Record In Union Lookup Should Use Unqualified Name (#574)
* Compatible record in union should use unqualified name * Fix logic based on avro version * add codegen * Fix some additional union lookup logic which we missed * Fix test * add codegen files --------- Co-authored-by: Amit Patil <[email protected]>
1 parent a56f7ae commit 381c766

File tree

7 files changed

+431
-11
lines changed

7 files changed

+431
-11
lines changed

fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastGenericDeserializerGeneratorTest.java

Lines changed: 148 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,13 @@
3333
import org.apache.avro.generic.GenericDatumReader;
3434
import org.apache.avro.generic.GenericRecord;
3535
import org.apache.avro.io.BinaryDecoder;
36-
import org.apache.avro.io.BinaryEncoder;
3736
import org.apache.avro.io.Decoder;
38-
import org.apache.avro.specific.SpecificDatumWriter;
3937
import org.apache.avro.util.Utf8;
4038
import org.testng.Assert;
4139
import org.testng.SkipException;
4240
import org.testng.annotations.BeforeTest;
4341
import org.testng.annotations.DataProvider;
4442
import org.testng.annotations.Test;
45-
import org.testng.collections.Lists;
4643
import org.testng.internal.collections.Pair;
4744

4845
import static com.linkedin.avro.fastserde.FastSerdeTestsSupport.*;
@@ -256,6 +253,154 @@ public void shouldReadFixed(Implementation implementation) {
256253
((List<GenericData.Fixed>) record.get("testFixedUnionArray")).get(0).bytes());
257254
}
258255

256+
@Test(groups = {"deserializationTest"}, dataProvider = "Implementation")
257+
public void shouldHandleDiffNamepspaceInRecords(Implementation implementation) {
258+
//record with two fields, first optional string second a record with namespace "a.b.c" with name "innerRecordName". Inner record has one int field
259+
Schema writerSchema = Schema.parse("{\n" +
260+
" \"type\": \"record\",\n" +
261+
" \"name\": \"OuterRecord\",\n" +
262+
" \"fields\": [\n" +
263+
" {\n" +
264+
" \"name\": \"optionalString\",\n" +
265+
" \"type\": [\"null\", \"string\"],\n" +
266+
" \"default\": null\n" +
267+
" },\n" +
268+
" {\n" +
269+
" \"name\": \"innerRecord\",\n" +
270+
" \"type\": {\n" +
271+
" \"type\": \"record\",\n" +
272+
" \"name\": \"innerRecordName\",\n" +
273+
" \"namespace\": \"a.b.c\",\n" +
274+
" \"fields\": [\n" +
275+
" {\n" +
276+
" \"name\": \"intField\",\n" +
277+
" \"type\": \"int\"\n" +
278+
" }\n" +
279+
" ]\n" +
280+
" }\n" +
281+
" }\n" +
282+
" ]\n" +
283+
"}");
284+
285+
//change namespace on inner record to "d.e.f"
286+
Schema readerSchema = Schema.parse("{\n" +
287+
" \"type\": \"record\",\n" +
288+
" \"name\": \"OuterRecord\",\n" +
289+
" \"fields\": [\n" +
290+
" {\n" +
291+
" \"name\": \"optionalString\",\n" +
292+
" \"type\": [\"null\", \"string\"],\n" +
293+
" \"default\": null\n" +
294+
" },\n" +
295+
" {\n" +
296+
" \"name\": \"innerRecord\",\n" +
297+
" \"type\": [\"null\", {\n" +
298+
" \"type\": \"record\",\n" +
299+
" \"name\": \"innerRecordName\",\n" +
300+
" \"namespace\": \"d.e.f\",\n" +
301+
" \"fields\": [\n" +
302+
" {\n" +
303+
" \"name\": \"intField\",\n" +
304+
" \"type\": \"int\"\n" +
305+
" }\n" +
306+
" ]\n" +
307+
" }],\n" +
308+
" \"default\": null\n" +
309+
" }\n" +
310+
" ]\n" +
311+
"}");
312+
313+
GenericRecord originalRecord = new GenericData.Record(writerSchema);
314+
originalRecord.put("optionalString", "abc");
315+
GenericRecord innerRecord = new GenericData.Record(writerSchema.getField("innerRecord").schema());
316+
innerRecord.put("intField", 1);
317+
originalRecord.put("innerRecord", innerRecord);
318+
319+
// when
320+
try{
321+
GenericRecord record = implementation.decode(writerSchema, readerSchema, genericDataAsDecoder(originalRecord));
322+
// then
323+
if(Utils.usesQualifiedNameForNamedTypedMatching()){
324+
Assert.fail("1.5-1.7 don't support unqualified name for named type matching so we should have failed");
325+
}
326+
Assert.assertEquals(new Utf8("abc"), record.get("optionalString"));
327+
GenericRecord innerRecordDecoded = (GenericRecord) record.get("innerRecord");
328+
Assert.assertEquals(1, innerRecordDecoded.get("intField"));
329+
} catch (Exception e){
330+
if(!Utils.usesQualifiedNameForNamedTypedMatching()) {
331+
Assert.fail("1.4, and 1.8+ support unqualified name for named type matching");
332+
}
333+
}
334+
}
335+
336+
337+
@Test(groups = {"deserializationTest"}, dataProvider = "Implementation")
338+
public void shouldNotFailOnNamespaceMismatch(Implementation implementation) throws IOException {
339+
// writer-side schema: "metadata" has NO namespace
340+
String writerSchemaStr = "{\n" +
341+
" \"type\": \"record\",\n" +
342+
" \"name\": \"wrapper\",\n" +
343+
" \"fields\": [\n" +
344+
" {\n" +
345+
" \"name\": \"metadata\",\n" +
346+
" \"type\": [\"null\", {\n" +
347+
" \"type\": \"record\",\n" +
348+
" \"name\": \"metadata\",\n" +
349+
" \"fields\": [\n" +
350+
" {\"name\": \"fieldName\", \"type\": [\"null\", \"string\"], \"default\": null}\n" +
351+
" ]\n" +
352+
" }],\n" +
353+
" \"default\": null\n" +
354+
" }\n" +
355+
" ]\n" +
356+
"}";
357+
358+
// reader-side schema: same nested record but WITH namespace "rtapi.surge"
359+
String readerSchemaStr = "{\n" +
360+
" \"type\": \"record\",\n" +
361+
" \"name\": \"Wrapper\",\n" +
362+
" \"fields\": [\n" +
363+
" {\n" +
364+
" \"name\": \"metadata\",\n" +
365+
" \"type\": [\"null\", {\n" +
366+
" \"type\": \"record\",\n" +
367+
" \"name\": \"metadata\",\n" +
368+
" \"namespace\": \"some.other.namespace\",\n" +
369+
" \"fields\": [\n" +
370+
" {\"name\": \"fieldName\", \"type\": [\"null\", \"string\"], \"default\": null}\n" +
371+
" ]\n" +
372+
" }],\n" +
373+
" \"default\": null\n" +
374+
" }\n" +
375+
" ]\n" +
376+
"}";
377+
378+
Schema writerSchema = AvroCompatibilityHelper.parse(writerSchemaStr);
379+
Schema readerSchema = AvroCompatibilityHelper.parse(readerSchemaStr);
380+
381+
// Build a writer-side record instance
382+
GenericRecord wrapper = new GenericData.Record(writerSchema);
383+
Schema metadataSchema = writerSchema.getField("metadata").schema().getTypes().get(1);
384+
GenericRecord metadataRecord = new GenericData.Record(metadataSchema);
385+
metadataRecord.put("fieldName", "abc-123");
386+
wrapper.put("metadata", metadataRecord);
387+
388+
// Attempt to deserialize – should throw AvroTypeException because of namespace mismatch
389+
try{
390+
GenericRecord record = implementation.decode(writerSchema, readerSchema, genericDataAsDecoder(wrapper));
391+
if(Utils.usesQualifiedNameForNamedTypedMatching()){
392+
Assert.fail("1.5-1.7 don't support unqualified name for named type matching so we should have failed");
393+
}
394+
Assert.assertEquals(((GenericRecord)record.get("metadata")).get("fieldName").toString(), "abc-123");
395+
} catch (AvroTypeException e){
396+
if(!Utils.usesQualifiedNameForNamedTypedMatching()) {
397+
Assert.fail("1.4, and 1.8+ support unqualified name for named type matching");
398+
}
399+
// expected exception
400+
}
401+
402+
}
403+
259404
@Test(groups = {"deserializationTest"}, dataProvider = "Implementation")
260405
public void shouldReadEnum(Implementation implementation) {
261406
// given
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
2+
package com.linkedin.avro.fastserde.generated.deserialization.AVRO_1_11;
3+
4+
import java.io.IOException;
5+
import com.linkedin.avro.fastserde.FastDeserializer;
6+
import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
7+
import org.apache.avro.Schema;
8+
import org.apache.avro.generic.IndexedRecord;
9+
import org.apache.avro.io.Decoder;
10+
import org.apache.avro.util.Utf8;
11+
12+
public class OuterRecord_GenericDeserializer_998347834_1261326440
13+
implements FastDeserializer<IndexedRecord>
14+
{
15+
16+
private final Schema readerSchema;
17+
private final Schema optionalString0;
18+
private final Schema innerRecord0;
19+
private final Schema innerRecordNameRecordSchema0;
20+
21+
public OuterRecord_GenericDeserializer_998347834_1261326440(Schema readerSchema) {
22+
this.readerSchema = readerSchema;
23+
this.optionalString0 = readerSchema.getField("optionalString").schema();
24+
this.innerRecord0 = readerSchema.getField("innerRecord").schema();
25+
this.innerRecordNameRecordSchema0 = innerRecord0 .getTypes().get(1);
26+
}
27+
28+
public IndexedRecord deserialize(IndexedRecord reuse, Decoder decoder, DatumReaderCustomization customization)
29+
throws IOException
30+
{
31+
return deserializeOuterRecord0((reuse), (decoder), (customization));
32+
}
33+
34+
public IndexedRecord deserializeOuterRecord0(Object reuse, Decoder decoder, DatumReaderCustomization customization)
35+
throws IOException
36+
{
37+
IndexedRecord outerRecord0;
38+
if ((((reuse)!= null)&&((reuse) instanceof IndexedRecord))&&(((IndexedRecord)(reuse)).getSchema() == readerSchema)) {
39+
outerRecord0 = ((IndexedRecord)(reuse));
40+
} else {
41+
outerRecord0 = new org.apache.avro.generic.GenericData.Record(readerSchema);
42+
}
43+
int unionIndex0 = (decoder.readIndex());
44+
if (unionIndex0 == 0) {
45+
decoder.readNull();
46+
outerRecord0 .put(0, null);
47+
} else {
48+
if (unionIndex0 == 1) {
49+
Utf8 charSequence0;
50+
Object oldString0 = outerRecord0 .get(0);
51+
if (oldString0 instanceof Utf8) {
52+
charSequence0 = (decoder).readString(((Utf8) oldString0));
53+
} else {
54+
charSequence0 = (decoder).readString(null);
55+
}
56+
outerRecord0 .put(0, charSequence0);
57+
} else {
58+
throw new RuntimeException(("Illegal union index for 'optionalString': "+ unionIndex0));
59+
}
60+
}
61+
populate_OuterRecord0((outerRecord0), (customization), (decoder));
62+
return outerRecord0;
63+
}
64+
65+
private void populate_OuterRecord0(IndexedRecord outerRecord0, DatumReaderCustomization customization, Decoder decoder)
66+
throws IOException
67+
{
68+
outerRecord0 .put(1, deserializeinnerRecordName0(outerRecord0 .get(1), (decoder), (customization)));
69+
}
70+
71+
public IndexedRecord deserializeinnerRecordName0(Object reuse, Decoder decoder, DatumReaderCustomization customization)
72+
throws IOException
73+
{
74+
IndexedRecord innerRecordName0;
75+
if ((((reuse)!= null)&&((reuse) instanceof IndexedRecord))&&(((IndexedRecord)(reuse)).getSchema() == innerRecordNameRecordSchema0)) {
76+
innerRecordName0 = ((IndexedRecord)(reuse));
77+
} else {
78+
innerRecordName0 = new org.apache.avro.generic.GenericData.Record(innerRecordNameRecordSchema0);
79+
}
80+
innerRecordName0 .put(0, (decoder.readInt()));
81+
return innerRecordName0;
82+
}
83+
84+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
2+
package com.linkedin.avro.fastserde.generated.deserialization.AVRO_1_11;
3+
4+
import java.io.IOException;
5+
import com.linkedin.avro.fastserde.FastDeserializer;
6+
import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
7+
import org.apache.avro.Schema;
8+
import org.apache.avro.generic.IndexedRecord;
9+
import org.apache.avro.io.Decoder;
10+
import org.apache.avro.util.Utf8;
11+
12+
public class Wrapper_GenericDeserializer_1221519218_203960580
13+
implements FastDeserializer<IndexedRecord>
14+
{
15+
16+
private final Schema readerSchema;
17+
private final Schema metadata0;
18+
private final Schema metadataOptionSchema0;
19+
private final Schema productTypeUUID0;
20+
21+
public Wrapper_GenericDeserializer_1221519218_203960580(Schema readerSchema) {
22+
this.readerSchema = readerSchema;
23+
this.metadata0 = readerSchema.getField("metadata").schema();
24+
this.metadataOptionSchema0 = metadata0 .getTypes().get(1);
25+
this.productTypeUUID0 = metadataOptionSchema0 .getField("productTypeUUID").schema();
26+
}
27+
28+
public IndexedRecord deserialize(IndexedRecord reuse, Decoder decoder, DatumReaderCustomization customization)
29+
throws IOException
30+
{
31+
return deserializeWrapper0((reuse), (decoder), (customization));
32+
}
33+
34+
public IndexedRecord deserializeWrapper0(Object reuse, Decoder decoder, DatumReaderCustomization customization)
35+
throws IOException
36+
{
37+
IndexedRecord wrapper0;
38+
if ((((reuse)!= null)&&((reuse) instanceof IndexedRecord))&&(((IndexedRecord)(reuse)).getSchema() == readerSchema)) {
39+
wrapper0 = ((IndexedRecord)(reuse));
40+
} else {
41+
wrapper0 = new org.apache.avro.generic.GenericData.Record(readerSchema);
42+
}
43+
int unionIndex0 = (decoder.readIndex());
44+
if (unionIndex0 == 0) {
45+
decoder.readNull();
46+
wrapper0 .put(0, null);
47+
} else {
48+
if (unionIndex0 == 1) {
49+
wrapper0 .put(0, deserializemetadata0(wrapper0 .get(0), (decoder), (customization)));
50+
} else {
51+
throw new RuntimeException(("Illegal union index for 'metadata': "+ unionIndex0));
52+
}
53+
}
54+
return wrapper0;
55+
}
56+
57+
public IndexedRecord deserializemetadata0(Object reuse, Decoder decoder, DatumReaderCustomization customization)
58+
throws IOException
59+
{
60+
IndexedRecord metadata1;
61+
if ((((reuse)!= null)&&((reuse) instanceof IndexedRecord))&&(((IndexedRecord)(reuse)).getSchema() == metadataOptionSchema0)) {
62+
metadata1 = ((IndexedRecord)(reuse));
63+
} else {
64+
metadata1 = new org.apache.avro.generic.GenericData.Record(metadataOptionSchema0);
65+
}
66+
int unionIndex1 = (decoder.readIndex());
67+
if (unionIndex1 == 0) {
68+
decoder.readNull();
69+
metadata1 .put(0, null);
70+
} else {
71+
if (unionIndex1 == 1) {
72+
Utf8 charSequence0;
73+
Object oldString0 = metadata1 .get(0);
74+
if (oldString0 instanceof Utf8) {
75+
charSequence0 = (decoder).readString(((Utf8) oldString0));
76+
} else {
77+
charSequence0 = (decoder).readString(null);
78+
}
79+
metadata1 .put(0, charSequence0);
80+
} else {
81+
throw new RuntimeException(("Illegal union index for 'productTypeUUID': "+ unionIndex1));
82+
}
83+
}
84+
return metadata1;
85+
}
86+
87+
}

0 commit comments

Comments
 (0)