Skip to content

Commit 46068dc

Browse files
committed
Set avro schema configuration in format bundle
Rather than managing the avro reader schema configuration in the input format getSplits method, it needs to be managed when creating the format bundle. Otherwise a crunch pipeline that has multiple inputs (kite views) with different schemas will not see the correct reader schemas. Note that the test only demonstrates the problem when also upgrading to crunch 0.13.0 (which is not part of this commit). This is due to CRUNCH-551 which is a fix for a problem in crunch that hides the current issue (at least in the scenario of the test) in versions before crunch-0.13.0. A test was also added to verify the behaviour with plain map/reduce to ensure that this continues to work as expected.
1 parent 1070ff9 commit 46068dc

File tree

5 files changed

+222
-86
lines changed

5 files changed

+222
-86
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Copyright 2014 Cloudera Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.kitesdk.data.spi.filesystem;
17+
18+
import org.apache.avro.Schema;
19+
import org.apache.avro.generic.GenericData;
20+
import org.apache.avro.hadoop.io.AvroSerialization;
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.parquet.avro.AvroReadSupport;
23+
import org.kitesdk.compat.DynMethods;
24+
import org.kitesdk.data.Format;
25+
import org.kitesdk.data.Formats;
26+
import org.kitesdk.data.spi.DataModelUtil;
27+
28+
public class AvroConfigurationUtil {
29+
30+
// Constant from AvroJob copied here so we can set it on the Configuration
31+
// given to this class.
32+
private static final String AVRO_SCHEMA_INPUT_KEY = "avro.schema.input.key";
33+
34+
// this is required for 1.7.4 because setDataModelClass is not available
35+
private static final DynMethods.StaticMethod setModel =
36+
new DynMethods.Builder("setDataModelClass")
37+
.impl(AvroSerialization.class, Configuration.class, Class.class)
38+
.defaultNoop()
39+
.buildStatic();
40+
41+
public static void configure(Configuration conf, Format format, Schema schema, Class<?> type) {
42+
GenericData model = DataModelUtil.getDataModelForType(type);
43+
if (Formats.AVRO.equals(format)) {
44+
setModel.invoke(conf, model.getClass());
45+
conf.set(AVRO_SCHEMA_INPUT_KEY, schema.toString());
46+
47+
} else if (Formats.PARQUET.equals(format)) {
48+
// TODO: update to a version of Parquet with setAvroDataSupplier
49+
//AvroReadSupport.setAvroDataSupplier(conf,
50+
// DataModelUtil.supplierClassFor(model));
51+
AvroReadSupport.setAvroReadSchema(conf, schema);
52+
}
53+
}
54+
55+
}

kite-data/kite-data-core/src/main/java/org/kitesdk/data/spi/filesystem/FileSystemViewKeyInputFormat.java

Lines changed: 9 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -17,67 +17,45 @@
1717

1818
import com.google.common.collect.ImmutableList;
1919
import com.google.common.collect.Lists;
20-
import java.io.IOException;
21-
import java.util.Iterator;
22-
import java.util.List;
23-
import org.apache.avro.Schema;
24-
import org.apache.avro.generic.GenericData;
25-
import org.apache.avro.hadoop.io.AvroSerialization;
2620
import org.apache.avro.mapred.AvroKey;
27-
import org.apache.avro.mapreduce.AvroJob;
2821
import org.apache.avro.mapreduce.AvroKeyInputFormat;
2922
import org.apache.hadoop.conf.Configuration;
3023
import org.apache.hadoop.fs.Path;
3124
import org.apache.hadoop.io.NullWritable;
32-
import org.apache.hadoop.mapreduce.InputFormat;
33-
import org.apache.hadoop.mapreduce.InputSplit;
34-
import org.apache.hadoop.mapreduce.Job;
35-
import org.apache.hadoop.mapreduce.JobContext;
36-
import org.apache.hadoop.mapreduce.RecordReader;
37-
import org.apache.hadoop.mapreduce.TaskAttemptContext;
25+
import org.apache.hadoop.mapreduce.*;
3826
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
3927
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
40-
import org.kitesdk.compat.DynMethods;
28+
import org.apache.parquet.avro.AvroParquetInputFormat;
4129
import org.kitesdk.compat.Hadoop;
4230
import org.kitesdk.data.Format;
4331
import org.kitesdk.data.Formats;
4432
import org.kitesdk.data.spi.AbstractKeyRecordReaderWrapper;
4533
import org.kitesdk.data.spi.AbstractRefinableView;
46-
import org.kitesdk.data.spi.DataModelUtil;
4734
import org.kitesdk.data.spi.FilteredRecordReader;
4835
import org.slf4j.Logger;
4936
import org.slf4j.LoggerFactory;
50-
import org.apache.parquet.avro.AvroParquetInputFormat;
51-
import org.apache.parquet.avro.AvroReadSupport;
37+
38+
import java.io.IOException;
39+
import java.util.Iterator;
40+
import java.util.List;
5241

5342
class FileSystemViewKeyInputFormat<E> extends InputFormat<E, Void> {
5443

5544
private static final Logger LOG =
5645
LoggerFactory.getLogger(FileSystemViewKeyInputFormat.class);
5746

58-
// Constant from AvroJob copied here so we can set it on the Configuration
59-
// given to this class.
60-
private static final String AVRO_SCHEMA_INPUT_KEY = "avro.schema.input.key";
61-
62-
// this is required for 1.7.4 because setDataModelClass is not available
63-
private static final DynMethods.StaticMethod setModel =
64-
new DynMethods.Builder("setDataModelClass")
65-
.impl(AvroSerialization.class, Configuration.class, Class.class)
66-
.defaultNoop()
67-
.buildStatic();
68-
6947
private FileSystemDataset<E> dataset;
7048
private FileSystemView<E> view;
7149

7250
public FileSystemViewKeyInputFormat(FileSystemDataset<E> dataset,
73-
Configuration conf) {
51+
Configuration conf) {
7452
this.dataset = dataset;
7553
this.view = null;
7654
LOG.debug("Dataset: {}", dataset);
7755

7856
Format format = dataset.getDescriptor().getFormat();
7957

80-
setConfigProperties(conf, format, dataset.getSchema(), dataset.getType());
58+
AvroConfigurationUtil.configure(conf, format, dataset.getSchema(), dataset.getType());
8159
}
8260

8361
public FileSystemViewKeyInputFormat(FileSystemView<E> view, Configuration conf) {
@@ -87,22 +65,7 @@ public FileSystemViewKeyInputFormat(FileSystemView<E> view, Configuration conf)
8765

8866
Format format = dataset.getDescriptor().getFormat();
8967

90-
setConfigProperties(conf, format, view.getSchema(), view.getType());
91-
}
92-
93-
private static void setConfigProperties(Configuration conf, Format format,
94-
Schema schema, Class<?> type) {
95-
GenericData model = DataModelUtil.getDataModelForType(type);
96-
if (Formats.AVRO.equals(format)) {
97-
setModel.invoke(conf, model.getClass());
98-
conf.set(AVRO_SCHEMA_INPUT_KEY, schema.toString());
99-
100-
} else if (Formats.PARQUET.equals(format)) {
101-
// TODO: update to a version of Parquet with setAvroDataSupplier
102-
//AvroReadSupport.setAvroDataSupplier(conf,
103-
// DataModelUtil.supplierClassFor(model));
104-
AvroReadSupport.setAvroReadSchema(conf, schema);
105-
}
68+
AvroConfigurationUtil.configure(conf, format, view.getSchema(), view.getType());
10669
}
10770

10871
@Override
@@ -114,7 +77,6 @@ public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
11477

11578
if (setInputPaths(jobContext, job)) {
11679
if (Formats.AVRO.equals(format)) {
117-
AvroJob.setInputKeySchema(job, dataset.getDescriptor().getSchema());
11880
AvroCombineInputFormat<E> delegate = new AvroCombineInputFormat<E>();
11981
return delegate.getSplits(jobContext);
12082
} else if (Formats.PARQUET.equals(format)) {

kite-data/kite-data-crunch/src/main/java/org/kitesdk/data/crunch/DatasetSourceTarget.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@
1515
*/
1616
package org.kitesdk.data.crunch;
1717

18-
import java.io.IOException;
19-
import java.net.URI;
20-
import java.util.Map;
21-
import java.util.Set;
2218
import com.google.common.collect.ImmutableSet;
2319
import org.apache.avro.generic.GenericData;
2420
import org.apache.crunch.ReadableData;
@@ -41,9 +37,15 @@
4137
import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
4238
import org.kitesdk.data.spi.LastModifiedAccessor;
4339
import org.kitesdk.data.spi.SizeAccessor;
40+
import org.kitesdk.data.spi.filesystem.AvroConfigurationUtil;
4441
import org.slf4j.Logger;
4542
import org.slf4j.LoggerFactory;
4643

44+
import java.io.IOException;
45+
import java.net.URI;
46+
import java.util.Map;
47+
import java.util.Set;
48+
4749
class DatasetSourceTarget<E> extends DatasetTarget<E> implements ReadableSourceTarget<E> {
4850

4951
private static final Logger LOG = LoggerFactory
@@ -72,8 +74,10 @@ public DatasetSourceTarget(View<E> view, AvroType<E> avroType) {
7274
this.view = view;
7375
this.avroType = avroType;
7476

75-
Configuration temp = new Configuration(false /* use an empty conf */ );
77+
Configuration temp = new Configuration(false /* use an empty conf */);
7678
DatasetKeyInputFormat.configure(temp).readFrom(view);
79+
AvroConfigurationUtil.configure(temp, view.getDataset().getDescriptor().getFormat(),
80+
view.getSchema(), view.getDataset().getType());
7781
this.formatBundle = inputBundle(temp);
7882
}
7983

@@ -85,7 +89,7 @@ public DatasetSourceTarget(URI uri, AvroType<E> avroType) {
8589
private static <E> AvroType<E> toAvroType(View<E> view, Class<E> type) {
8690
if (type.isAssignableFrom(GenericData.Record.class)) {
8791
return (AvroType<E>) Avros.generics(
88-
view.getDataset().getDescriptor().getSchema());
92+
view.getDataset().getDescriptor().getSchema());
8993
} else {
9094
return Avros.records(type);
9195
}

kite-data/kite-data-crunch/src/test/java/org/kitesdk/data/crunch/TestCrunchDatasets.java

Lines changed: 99 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,54 +16,32 @@
1616
package org.kitesdk.data.crunch;
1717

1818
import com.google.common.io.Files;
19-
import java.io.IOException;
20-
import java.net.URI;
21-
import java.util.Arrays;
22-
import java.util.Collection;
23-
2419
import org.apache.avro.Schema;
2520
import org.apache.avro.SchemaBuilder;
2621
import org.apache.avro.generic.GenericData;
2722
import org.apache.avro.generic.GenericData.Record;
28-
import org.apache.avro.generic.GenericRecord;
29-
import org.apache.crunch.CrunchRuntimeException;
30-
import org.apache.crunch.MapFn;
31-
import org.apache.crunch.PCollection;
32-
import org.apache.crunch.Pipeline;
33-
import org.apache.crunch.Target;
23+
import org.apache.crunch.*;
3424
import org.apache.crunch.impl.mr.MRPipeline;
3525
import org.apache.crunch.types.avro.Avros;
3626
import org.apache.hadoop.fs.FileSystem;
3727
import org.apache.hadoop.fs.Path;
38-
import org.junit.After;
39-
import org.junit.Assert;
40-
import org.junit.Assume;
41-
import org.junit.Before;
42-
import org.junit.Test;
28+
import org.junit.*;
4329
import org.junit.runner.RunWith;
4430
import org.junit.runners.Parameterized;
4531
import org.kitesdk.compat.Hadoop;
46-
import org.kitesdk.data.Dataset;
47-
import org.kitesdk.data.DatasetDescriptor;
48-
import org.kitesdk.data.DatasetReader;
49-
import org.kitesdk.data.DatasetWriter;
50-
import org.kitesdk.data.Datasets;
51-
import org.kitesdk.data.Formats;
52-
import org.kitesdk.data.MiniDFSTest;
53-
import org.kitesdk.data.Signalable;
54-
import org.kitesdk.data.spi.PartitionKey;
55-
import org.kitesdk.data.PartitionStrategy;
32+
import org.kitesdk.data.*;
5633
import org.kitesdk.data.spi.DatasetRepository;
57-
import org.kitesdk.data.spi.PartitionedDataset;
58-
import org.kitesdk.data.View;
5934
import org.kitesdk.data.spi.LastModifiedAccessor;
60-
import org.kitesdk.data.URIBuilder;
35+
import org.kitesdk.data.spi.PartitionKey;
36+
import org.kitesdk.data.spi.PartitionedDataset;
6137
import org.kitesdk.data.user.NewUserRecord;
6238

63-
import static org.kitesdk.data.spi.filesystem.DatasetTestUtilities.USER_SCHEMA;
64-
import static org.kitesdk.data.spi.filesystem.DatasetTestUtilities.checkTestUsers;
65-
import static org.kitesdk.data.spi.filesystem.DatasetTestUtilities.datasetSize;
66-
import static org.kitesdk.data.spi.filesystem.DatasetTestUtilities.writeTestUsers;
39+
import java.io.IOException;
40+
import java.net.URI;
41+
import java.util.Arrays;
42+
import java.util.Collection;
43+
44+
import static org.kitesdk.data.spi.filesystem.DatasetTestUtilities.*;
6745

6846
@RunWith(Parameterized.class)
6947
public abstract class TestCrunchDatasets extends MiniDFSTest {
@@ -654,4 +632,92 @@ public void testMultipleFileReadingFromCrunch() throws IOException {
654632

655633
checkTestUsers(outputDataset, 10);
656634
}
635+
636+
@Test
637+
public void testMultipleFileReadingFromCrunchWithDifferentReaderWriterSchemas() {
638+
Schema userNameOnlySchema = SchemaBuilder.record("userNameOnlyRecord")
639+
.fields()
640+
.requiredString("username")
641+
.endRecord();
642+
643+
Schema emailOnlySchema = SchemaBuilder.record("emailOnlyRecord")
644+
.fields()
645+
.requiredString("email")
646+
.endRecord();
647+
648+
// write two files, each of 5 records, using the original schema (username and email)
649+
Dataset<GenericData.Record> writeDatasetA = repo.create("ns", "inA", new DatasetDescriptor.Builder()
650+
.schema(USER_SCHEMA).build());
651+
Dataset<GenericData.Record> writeDatasetB = repo.create("ns", "inB", new DatasetDescriptor.Builder()
652+
.schema(USER_SCHEMA).build());
653+
writeTestUsers(writeDatasetA, 5, 0);
654+
writeTestUsers(writeDatasetB, 5, 5);
655+
656+
// update the schema of the repositories (using a schema with only the username or email field)
657+
repo.update("ns", "inA", new DatasetDescriptor.Builder(repo.load("ns", "inA").getDescriptor())
658+
.schema(userNameOnlySchema).build());
659+
repo.update("ns", "inB", new DatasetDescriptor.Builder(repo.load("ns", "inB").getDescriptor())
660+
.schema(emailOnlySchema).build());
661+
662+
// run a crunch singleInputPipeline to read/write the records using the reduced schemas
663+
Dataset<GenericData.Record> inputA = repo.load("ns", "inA");
664+
Dataset<GenericData.Record> inputB = repo.load("ns", "inB");
665+
666+
Dataset<GenericData.Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
667+
.schema(userNameOnlySchema).build());
668+
669+
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
670+
PCollection<GenericData.Record> dataA = pipeline.read(CrunchDatasets.asSource(inputA))
671+
.filter("remove records that don't have the correct schema",
672+
new FilterRecordsWithExpectedSchemaFn(userNameOnlySchema.toString()));
673+
PCollection<GenericData.Record> dataB = pipeline.read(CrunchDatasets.asSource(inputB))
674+
.filter("remove records that don't have the correct schema",
675+
new FilterRecordsWithExpectedSchemaFn(emailOnlySchema.toString()));
676+
pipeline.write(dataA.union(dataB), CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND);
677+
pipeline.run();
678+
679+
// If the records did not have the correct schema, they would have been filtered. So this checks that they all had the
680+
// expected schema indeed.
681+
checkReaderIteration(outputDataset.newReader(), 10, new NopRecordValidator());
682+
683+
// Repeat the same test with only a single input, to ensure that the simple case also works
684+
Dataset<GenericData.Record> singleInputOutputDataset = repo.create("ns", "out2", new DatasetDescriptor.Builder()
685+
.schema(userNameOnlySchema).build());
686+
687+
Pipeline singleInputPipeline = new MRPipeline(TestCrunchDatasets.class);
688+
PCollection<GenericData.Record> singleInputFiltered = singleInputPipeline.read(CrunchDatasets.asSource(inputA))
689+
.filter("remove records that don't have the correct schema",
690+
new FilterRecordsWithExpectedSchemaFn(userNameOnlySchema.toString()));
691+
singleInputPipeline.write(singleInputFiltered, CrunchDatasets.asTarget(singleInputOutputDataset), Target.WriteMode.APPEND);
692+
singleInputPipeline.run();
693+
694+
checkReaderIteration(singleInputOutputDataset.newReader(), 5, new NopRecordValidator());
695+
}
696+
697+
private static final class FilterRecordsWithExpectedSchemaFn extends FilterFn<Record> {
698+
699+
private final String expectedSchemaString;
700+
private transient Schema expectedSchema;
701+
702+
private FilterRecordsWithExpectedSchemaFn(String expectedSchemaString) {
703+
this.expectedSchemaString = expectedSchemaString;
704+
}
705+
706+
@Override
707+
public void initialize() {
708+
this.expectedSchema = new Schema.Parser().parse(expectedSchemaString);
709+
}
710+
711+
@Override
712+
public boolean accept(GenericData.Record record) {
713+
return expectedSchema.equals(record.getSchema());
714+
}
715+
}
716+
717+
private static class NopRecordValidator implements RecordValidator<Record> {
718+
@Override
719+
public void validate(Record record, int recordNum) {
720+
// nop
721+
}
722+
}
657723
}

0 commit comments

Comments
 (0)