Skip to content

Commit 32092a9

Browse files
author
hoonki
committed
Fixed bug to parquet select columns. And add 'column count' variables
1 parent 6f78021 commit 32092a9

File tree

4 files changed

+80
-66
lines changed

4 files changed

+80
-66
lines changed

common/data/src/main/java/com/samsung/sds/brightics/common/data/client/ParquetClient.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ public static ObjectTable readParquet(String path, long min, long max, int[] fil
125125
filteredColumns);
126126
List<FileIndex> indexes = info.getLimitedFiles(min, max);
127127
Column[] schema = info.getSchema();
128+
int[] columnIndex = info.getColumnIndex();
128129

129130
List<Object[]> data = new ArrayList<>();
130131
for (FileIndex index : indexes) {
@@ -145,13 +146,13 @@ public static ObjectTable readParquet(String path, long min, long max, int[] fil
145146
}
146147
long numRowCount = endIndex - startIndex - leftSkip - rightSkip;
147148
reader = (BrighticsParquetReader<DefaultRecord>) BrighticsParquetUtils.getReader(new Path(filePath),
148-
filteredColumns);
149+
columnIndex);
149150
while (leftSkip > 0) {
150-
reader.read(filteredColumns);
151+
reader.read(columnIndex);
151152
leftSkip--;
152153
}
153154
DefaultRecord record;
154-
while ((record = reader.read(filteredColumns)) != null && numRowCount > 0) {
155+
while ((record = reader.read(columnIndex)) != null && numRowCount > 0) {
155156
data.add(record.getValues());
156157
numRowCount--;
157158
}
@@ -163,7 +164,7 @@ public static ObjectTable readParquet(String path, long min, long max, int[] fil
163164

164165
}
165166
//send filtered schema and row data information with total count and byte
166-
return new ObjectTable(info.getCount(), info.getBytes(), schema, data);
167+
return new ObjectTable(info.getCount(), schema.length, info.getBytes(), schema, data);
167168
}
168169

169170
public static ObjectTable readParquet(String path, long min, long max) throws IllegalArgumentException, IOException {
Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,46 @@
1-
/*
2-
Copyright 2019 Samsung SDS
3-
4-
Licensed under the Apache License, Version 2.0 (the "License");
5-
you may not use this file except in compliance with the License.
6-
You may obtain a copy of the License at
7-
8-
http://www.apache.org/licenses/LICENSE-2.0
9-
10-
Unless required by applicable law or agreed to in writing, software
11-
distributed under the License is distributed on an "AS IS" BASIS,
12-
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
See the License for the specific language governing permissions and
14-
limitations under the License.
15-
*/
16-
1+
/*
2+
Copyright 2019 Samsung SDS
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
1717
package com.samsung.sds.brightics.common.data.parquet.reader.info;
1818

1919
import java.util.List;
2020
import java.util.stream.Collectors;
21+
import java.util.stream.IntStream;
2122

2223
import com.samsung.sds.brightics.common.data.view.Column;
2324

2425
public class ParquetInformation {
25-
26+
2627
Column[] schema;
2728
long count;
2829
long bytes;
2930
List<FileIndex> page;
30-
31+
int[] columnIndex;
32+
3133
public ParquetInformation(Column[] schema, long count, long bytes, List<FileIndex> page) {
34+
this(schema, count, bytes, page, new int[0]);
35+
}
36+
37+
public ParquetInformation(Column[] schema, long count, long bytes, List<FileIndex> page, int[] columnIndex) {
3238
super();
3339
this.schema = schema;
3440
this.count = count;
3541
this.bytes = bytes;
3642
this.page = page;
43+
this.columnIndex = columnIndex;
3744
}
3845

3946
public List<FileIndex> getLimitedFiles(long lowerOffset, long upperOffset) {
@@ -46,6 +53,10 @@ public Column[] getSchema() {
4653
return schema;
4754
}
4855

56+
public int[] getColumnIndex() {
57+
return columnIndex;
58+
}
59+
4960
public long getCount() {
5061
return count;
5162
}
@@ -57,7 +68,5 @@ public long getBytes() {
5768
public List<FileIndex> getPage() {
5869
return page;
5970
}
60-
61-
62-
71+
6372
}

common/data/src/main/java/com/samsung/sds/brightics/common/data/parquet/reader/util/BrighticsParquetUtils.java

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,11 @@
1616

1717
package com.samsung.sds.brightics.common.data.parquet.reader.util;
1818

19-
import java.io.IOException;
20-
import java.util.ArrayList;
21-
import java.util.Arrays;
22-
import java.util.Iterator;
23-
import java.util.List;
24-
import java.util.stream.Collectors;
25-
import java.util.stream.IntStream;
26-
import java.util.stream.Stream;
27-
19+
import com.samsung.sds.brightics.common.data.parquet.reader.BrighticsParquetReadSupport;
20+
import com.samsung.sds.brightics.common.data.parquet.reader.DefaultRecord;
21+
import com.samsung.sds.brightics.common.data.parquet.reader.info.FileIndex;
22+
import com.samsung.sds.brightics.common.data.parquet.reader.info.ParquetInformation;
23+
import com.samsung.sds.brightics.common.data.view.Column;
2824
import org.apache.hadoop.conf.Configuration;
2925
import org.apache.hadoop.fs.FileStatus;
3026
import org.apache.hadoop.fs.FileSystem;
@@ -36,23 +32,23 @@
3632
import org.apache.parquet.hadoop.metadata.BlockMetaData;
3733
import org.apache.parquet.hadoop.metadata.FileMetaData;
3834
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
39-
import org.apache.parquet.schema.DecimalMetadata;
40-
import org.apache.parquet.schema.GroupType;
41-
import org.apache.parquet.schema.MessageType;
42-
import org.apache.parquet.schema.OriginalType;
43-
import org.apache.parquet.schema.Type;
35+
import org.apache.parquet.schema.*;
4436
import org.apache.parquet.schema.Type.Repetition;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
4539

46-
import com.samsung.sds.brightics.common.core.exception.BrighticsCoreException;
47-
import com.samsung.sds.brightics.common.data.parquet.reader.BrighticsParquetReadSupport;
48-
import com.samsung.sds.brightics.common.data.parquet.reader.DefaultRecord;
49-
import com.samsung.sds.brightics.common.data.parquet.reader.info.FileIndex;
50-
import com.samsung.sds.brightics.common.data.parquet.reader.info.ParquetInformation;
51-
import com.samsung.sds.brightics.common.data.view.Column;
40+
import java.io.IOException;
41+
import java.util.ArrayList;
42+
import java.util.Arrays;
43+
import java.util.Iterator;
44+
import java.util.List;
45+
import java.util.stream.Collectors;
46+
import java.util.stream.IntStream;
47+
import java.util.stream.Stream;
5248

5349
public class BrighticsParquetUtils {
5450

55-
// private static final Logger LOGGER = LoggerFactory.getLogger("ParquetClient");
51+
private static final Logger LOGGER = LoggerFactory.getLogger("ParquetClient");
5652

5753
public static ParquetInformation getParquetInformation(Path path, Configuration conf, int[] filteredColumnIndex) throws IOException {
5854
FileStatus directory = FileSystem.get(conf).getFileStatus(path);
@@ -68,7 +64,8 @@ public static ParquetInformation getParquetInformation(Path path, Configuration
6864

6965
// set schema
7066
FileMetaData fileMetaData = footers.get(0).getParquetMetadata().getFileMetaData();
71-
List<Type> filteredColumns = getFilteredColumns(fileMetaData.getSchema(), filteredColumnIndex);
67+
int[] validatedColumnIndexArray = Arrays.stream(filteredColumnIndex).filter(i -> fileMetaData.getSchema().getColumns().size() > i).toArray();
68+
List<Type> filteredColumns = getFilteredColumns(fileMetaData.getSchema(), validatedColumnIndexArray);
7269
Column[] schema = filteredColumns.stream().map(c -> new Column(c.getName(), convertTypeName(c))).toArray(Column[]::new);
7370

7471
//set count, buffer size
@@ -86,7 +83,7 @@ public static ParquetInformation getParquetInformation(Path path, Configuration
8683
buf.add(new FileIndex(footer.getFile().toString(), previousTotalCount, previousTotalCount + currentCount));
8784
previousTotalCount += currentCount;
8885
}
89-
return new ParquetInformation(schema, previousTotalCount, totalBytes, buf);
86+
return new ParquetInformation(schema, previousTotalCount, totalBytes, buf, validatedColumnIndexArray);
9087
}
9188

9289
public static ParquetInformation getParquetInformation(Path path, Configuration conf) throws IOException {
@@ -194,7 +191,8 @@ public static List<Type> getFilteredColumns(MessageType schema, int[] filteredCo
194191
List<Type> copyFields = new ArrayList<Type>();
195192
//if filteredColumns is null or 0. pass filtering
196193
if(schema.getColumns().size() < filteredColumns.length) {
197-
throw new BrighticsCoreException("3102", "The column size used in the query is larger than the number of existing data columns.");
194+
LOGGER.warn("The column size used in the query is larger than the number of existing data columns.");
195+
// throw new BrighticsCoreException("3102", "The column size used in the query is larger than the number of existing data columns.");
198196
}
199197

200198
if (filteredColumns != null && filteredColumns.length > 0) {
@@ -220,7 +218,7 @@ public static int[] combineFilteredColumnIndexArray(int start, int end, int[] se
220218
Stream<Integer> range = IntStream.range(start, end + 1).boxed();
221219
return Stream.concat(range, selected).distinct().sorted().mapToInt(i -> i).toArray();
222220
} else {
223-
return selected.mapToInt(i -> i).toArray();
221+
return selected.distinct().sorted().mapToInt(i -> i).toArray();
224222
}
225223
}
226224

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
1-
/*
2-
Copyright 2019 Samsung SDS
3-
4-
Licensed under the Apache License, Version 2.0 (the "License");
5-
you may not use this file except in compliance with the License.
6-
You may obtain a copy of the License at
7-
8-
http://www.apache.org/licenses/LICENSE-2.0
9-
10-
Unless required by applicable law or agreed to in writing, software
11-
distributed under the License is distributed on an "AS IS" BASIS,
12-
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
See the License for the specific language governing permissions and
14-
limitations under the License.
15-
*/
16-
1+
/*
2+
Copyright 2019 Samsung SDS
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
1717
package com.samsung.sds.brightics.common.data.view;
1818

1919
import java.util.List;
@@ -32,4 +32,10 @@ public ObjectTable(long count, long bytes, Column[] schema, List<Object[]> data)
3232
this.data.put("data", data);
3333
}
3434

35+
public ObjectTable(long count, long columnCount, long bytes, Column[] schema, List<Object[]> data) {
36+
super(count, bytes, schema);
37+
this.data.put("columnCount", columnCount);
38+
this.data.put("data", data);
39+
}
40+
3541
}

0 commit comments

Comments
 (0)