diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageTest.java index e18cb1f54b..a70422c8f6 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageTest.java @@ -58,6 +58,7 @@ import com.google.cloud.bigquery.storage.v1.it.SimpleRowReader.AvroRowConsumer; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.protobuf.Timestamp; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -438,6 +439,7 @@ public void testReadAtSnapshot() throws InterruptedException, IOException { /* table = */ table, /* snapshotInMillis = */ firstJob.getStatistics().getEndTime(), /* filter = */ null, + /* selectedFields = */ null, /* consumer = */ new AvroRowConsumer() { @Override public void accept(GenericData.Record record) { @@ -451,6 +453,7 @@ public void accept(GenericData.Record record) { /* table = */ table, /* snapshotInMillis = */ secondJob.getStatistics().getEndTime(), /* filter = */ null, + /* selectedFields = */ null, /* consumer = */ new AvroRowConsumer() { @Override public void accept(GenericData.Record record) { @@ -579,7 +582,7 @@ public void testBasicSqlTypes() throws InterruptedException, IOException { /* datasetId = */ DATASET, /* tableId = */ tableName); - List rows = ReadAllRows(/* table = */ table, /* filter = */ null); + List rows = ReadAllRows(table); assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); GenericData.Record record = rows.get(0); @@ -678,7 +681,7 @@ public void testDateAndTimeSqlTypes() throws InterruptedException, IOException { /* datasetId = */ DATASET, /* tableId = */ tableName); - List rows = ReadAllRows(/* table = */ table, /* filter = */ null); + List rows = ReadAllRows(table); assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); GenericData.Record record = rows.get(0); @@ -778,7 +781,7 @@ public void testGeographySqlType() throws InterruptedException, IOException { /* datasetId = */ DATASET, /* tableId = */ tableName); - List rows = ReadAllRows(/* table = */ table, /* filter = */ null); + List rows = ReadAllRows(table); assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); GenericData.Record record = rows.get(0); @@ -811,14 +814,16 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio String.format( " CREATE TABLE %s.%s " + " (array_field ARRAY," - + " struct_field STRUCT NOT NULL)" + + " struct_field STRUCT NOT NULL," + + " struct_field_opt STRUCT)" + " OPTIONS( " + " description=\"a table with array and time column types\" " + " ) " + "AS " + " SELECT " + " [1, 2, 3]," - + " (10, 'abc')", + + " (10, 'abc')," + + " null", DATASET, tableName); RunQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); @@ -829,7 +834,11 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio /* datasetId = */ DATASET, /* tableId = */ tableName); - List rows = ReadAllRows(/* table = */ table, /* filter = */ null); + List rows = ReadAllRows( + /* table = */ table, + /* filter = */ null, + /* selectedFields = */ ImmutableList.of("array_field", "struct_field", "struct_field_opt.int_field") + ); assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); GenericData.Record record = rows.get(0); @@ -838,11 +847,11 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio String actualSchemaMessage = String.format( "Unexpected schema. Actual schema:%n%s", avroSchema.toString(/* pretty = */ true)); - String rowAssertMessage = String.format("Row not matching expectations: %s", record.toString()); + String rowAssertMessage = String.format("Row not matching expectations: %s", record); assertEquals(actualSchemaMessage, Schema.Type.RECORD, avroSchema.getType()); assertEquals(actualSchemaMessage, "__root__", avroSchema.getName()); - assertEquals(actualSchemaMessage, 2, avroSchema.getFields().size()); + assertEquals(actualSchemaMessage, 3, avroSchema.getFields().size()); assertEquals( actualSchemaMessage, @@ -859,6 +868,7 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio // Validate the STRUCT field and its members. Schema structSchema = avroSchema.getField("struct_field").schema(); + assertEquals(actualSchemaMessage, 2, structSchema.getFields().size()); assertEquals(actualSchemaMessage, Schema.Type.RECORD, structSchema.getType()); GenericData.Record structRecord = (GenericData.Record) record.get("struct_field"); @@ -873,6 +883,16 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio Schema.Type.STRING, structSchema.getField("str_field").schema().getType()); assertEquals(rowAssertMessage, new Utf8("abc"), structRecord.get("str_field")); + + // Validate the projected STRUCT field and its members. + Schema unionSchema = avroSchema.getField("struct_field_opt").schema(); + Schema structOpSchema = unionSchema.getTypes().get(1); + assertEquals(actualSchemaMessage, 1, structOpSchema.getFields().size()); + assertEquals( + actualSchemaMessage, + Schema.Type.LONG, + structSchema.getField("int_field").schema().getType()); + assertNull(rowAssertMessage, record.get("struct_field_opt")); } @Test @@ -1106,7 +1126,7 @@ private long ReadStreamToOffset(ReadStream readStream, long rowOffset) { * @throws IOException */ private void ProcessRowsAtSnapshot( - String table, Long snapshotInMillis, String filter, AvroRowConsumer consumer) + String table, Long snapshotInMillis, String filter, List selectedFields, AvroRowConsumer consumer) throws IOException { Preconditions.checkNotNull(table); Preconditions.checkNotNull(consumer); @@ -1130,11 +1150,14 @@ private void ProcessRowsAtSnapshot( TableModifiers.newBuilder().setSnapshotTime(snapshotTimestamp).build()); } + TableReadOptions.Builder tableReadOptionsBuilder = TableReadOptions.newBuilder(); if (filter != null && !filter.isEmpty()) { - createSessionRequestBuilder - .getReadSessionBuilder() - .setReadOptions(TableReadOptions.newBuilder().setRowRestriction(filter).build()); + tableReadOptionsBuilder.setRowRestriction(filter); } + if (selectedFields != null && !selectedFields.isEmpty()) { + tableReadOptionsBuilder.addAllSelectedFields(selectedFields); + } + createSessionRequestBuilder.getReadSessionBuilder().setReadOptions(tableReadOptionsBuilder.build()); ReadSession session = client.createReadSession(createSessionRequestBuilder.build()); assertEquals( @@ -1156,6 +1179,16 @@ private void ProcessRowsAtSnapshot( } } + /** + * Reads all the rows from the specified table and returns a list as generic Avro records. + * + * @param table + * @return + */ + List ReadAllRows(String table) throws IOException { + return ReadAllRows(table, /* filter = */ null); + } + /** * Reads all the rows from the specified table and returns a list as generic Avro records. * @@ -1164,11 +1197,24 @@ private void ProcessRowsAtSnapshot( * @return */ List ReadAllRows(String table, String filter) throws IOException { + return ReadAllRows(table, filter); + } + + /** + * Reads all the rows from the specified table and returns a list as generic Avro records. + * + * @param table + * @param filter Optional. If specified, it will be used to restrict returned data. + * @param selectedFields Optional. If specified, it will be used to project data. + * @return + */ + List ReadAllRows(String table, String filter, List selectedFields) throws IOException { final List rows = new ArrayList<>(); ProcessRowsAtSnapshot( /* table = */ table, /* snapshotInMillis = */ null, /* filter = */ filter, + /* selectedFields = */ selectedFields, new AvroRowConsumer() { @Override public void accept(GenericData.Record record) {