Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add failing integration-test null selected fieled coalesced to zero #2466

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.google.cloud.bigquery.storage.v1.it.SimpleRowReader.AvroRowConsumer;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Timestamp;
import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -438,6 +439,7 @@ public void testReadAtSnapshot() throws InterruptedException, IOException {
/* table = */ table,
/* snapshotInMillis = */ firstJob.getStatistics().getEndTime(),
/* filter = */ null,
/* selectedFields = */ null,
/* consumer = */ new AvroRowConsumer() {
@Override
public void accept(GenericData.Record record) {
Expand All @@ -451,6 +453,7 @@ public void accept(GenericData.Record record) {
/* table = */ table,
/* snapshotInMillis = */ secondJob.getStatistics().getEndTime(),
/* filter = */ null,
/* selectedFields = */ null,
/* consumer = */ new AvroRowConsumer() {
@Override
public void accept(GenericData.Record record) {
Expand Down Expand Up @@ -579,7 +582,7 @@ public void testBasicSqlTypes() throws InterruptedException, IOException {
/* datasetId = */ DATASET,
/* tableId = */ tableName);

List<GenericData.Record> rows = ReadAllRows(/* table = */ table, /* filter = */ null);
List<GenericData.Record> rows = ReadAllRows(table);
assertEquals("Actual rows read: " + rows.toString(), 1, rows.size());

GenericData.Record record = rows.get(0);
Expand Down Expand Up @@ -678,7 +681,7 @@ public void testDateAndTimeSqlTypes() throws InterruptedException, IOException {
/* datasetId = */ DATASET,
/* tableId = */ tableName);

List<GenericData.Record> rows = ReadAllRows(/* table = */ table, /* filter = */ null);
List<GenericData.Record> rows = ReadAllRows(table);
assertEquals("Actual rows read: " + rows.toString(), 1, rows.size());

GenericData.Record record = rows.get(0);
Expand Down Expand Up @@ -778,7 +781,7 @@ public void testGeographySqlType() throws InterruptedException, IOException {
/* datasetId = */ DATASET,
/* tableId = */ tableName);

List<GenericData.Record> rows = ReadAllRows(/* table = */ table, /* filter = */ null);
List<GenericData.Record> rows = ReadAllRows(table);
assertEquals("Actual rows read: " + rows.toString(), 1, rows.size());

GenericData.Record record = rows.get(0);
Expand Down Expand Up @@ -811,14 +814,16 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio
String.format(
" CREATE TABLE %s.%s "
+ " (array_field ARRAY<INT64>,"
+ " struct_field STRUCT<int_field INT64 NOT NULL, str_field STRING NOT NULL> NOT NULL)"
+ " struct_field STRUCT<int_field INT64 NOT NULL, str_field STRING NOT NULL> NOT NULL,"
+ " struct_field_opt STRUCT<int_field INT64 NOT NULL, str_field STRING NOT NULL>)"
+ " OPTIONS( "
+ " description=\"a table with array and time column types\" "
+ " ) "
+ "AS "
+ " SELECT "
+ " [1, 2, 3],"
+ " (10, 'abc')",
+ " (10, 'abc'),"
+ " null",
DATASET, tableName);

RunQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build());
Expand All @@ -829,7 +834,11 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio
/* datasetId = */ DATASET,
/* tableId = */ tableName);

List<GenericData.Record> rows = ReadAllRows(/* table = */ table, /* filter = */ null);
List<GenericData.Record> rows = ReadAllRows(
/* table = */ table,
/* filter = */ null,
/* selectedFields = */ ImmutableList.of("array_field", "struct_field", "struct_field_opt.int_field")
);
assertEquals("Actual rows read: " + rows.toString(), 1, rows.size());

GenericData.Record record = rows.get(0);
Expand All @@ -838,11 +847,11 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio
String actualSchemaMessage =
String.format(
"Unexpected schema. Actual schema:%n%s", avroSchema.toString(/* pretty = */ true));
String rowAssertMessage = String.format("Row not matching expectations: %s", record.toString());
String rowAssertMessage = String.format("Row not matching expectations: %s", record);

assertEquals(actualSchemaMessage, Schema.Type.RECORD, avroSchema.getType());
assertEquals(actualSchemaMessage, "__root__", avroSchema.getName());
assertEquals(actualSchemaMessage, 2, avroSchema.getFields().size());
assertEquals(actualSchemaMessage, 3, avroSchema.getFields().size());

assertEquals(
actualSchemaMessage,
Expand All @@ -859,6 +868,7 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio

// Validate the STRUCT field and its members.
Schema structSchema = avroSchema.getField("struct_field").schema();
assertEquals(actualSchemaMessage, 2, structSchema.getFields().size());
assertEquals(actualSchemaMessage, Schema.Type.RECORD, structSchema.getType());
GenericData.Record structRecord = (GenericData.Record) record.get("struct_field");

Expand All @@ -873,6 +883,16 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio
Schema.Type.STRING,
structSchema.getField("str_field").schema().getType());
assertEquals(rowAssertMessage, new Utf8("abc"), structRecord.get("str_field"));

// Validate the projected STRUCT field and its members.
Schema unionSchema = avroSchema.getField("struct_field_opt").schema();
Schema structOpSchema = unionSchema.getTypes().get(1);
assertEquals(actualSchemaMessage, 1, structOpSchema.getFields().size());
assertEquals(
actualSchemaMessage,
Schema.Type.LONG,
structSchema.getField("int_field").schema().getType());
assertNull(rowAssertMessage, record.get("struct_field_opt"));
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test fails here. whole record is null but returned value is {"int_field": 0}

}

@Test
Expand Down Expand Up @@ -1106,7 +1126,7 @@ private long ReadStreamToOffset(ReadStream readStream, long rowOffset) {
* @throws IOException
*/
private void ProcessRowsAtSnapshot(
String table, Long snapshotInMillis, String filter, AvroRowConsumer consumer)
String table, Long snapshotInMillis, String filter, List<String> selectedFields, AvroRowConsumer consumer)
throws IOException {
Preconditions.checkNotNull(table);
Preconditions.checkNotNull(consumer);
Expand All @@ -1130,11 +1150,14 @@ private void ProcessRowsAtSnapshot(
TableModifiers.newBuilder().setSnapshotTime(snapshotTimestamp).build());
}

TableReadOptions.Builder tableReadOptionsBuilder = TableReadOptions.newBuilder();
if (filter != null && !filter.isEmpty()) {
createSessionRequestBuilder
.getReadSessionBuilder()
.setReadOptions(TableReadOptions.newBuilder().setRowRestriction(filter).build());
tableReadOptionsBuilder.setRowRestriction(filter);
}
if (selectedFields != null && !selectedFields.isEmpty()) {
tableReadOptionsBuilder.addAllSelectedFields(selectedFields);
}
createSessionRequestBuilder.getReadSessionBuilder().setReadOptions(tableReadOptionsBuilder.build());

ReadSession session = client.createReadSession(createSessionRequestBuilder.build());
assertEquals(
Expand All @@ -1156,6 +1179,16 @@ private void ProcessRowsAtSnapshot(
}
}

/**
* Reads all the rows from the specified table and returns a list as generic Avro records.
*
* @param table
* @return
*/
List<GenericData.Record> ReadAllRows(String table) throws IOException {
return ReadAllRows(table, /* filter = */ null);
}

/**
* Reads all the rows from the specified table and returns a list as generic Avro records.
*
Expand All @@ -1164,11 +1197,24 @@ private void ProcessRowsAtSnapshot(
* @return
*/
List<GenericData.Record> ReadAllRows(String table, String filter) throws IOException {
return ReadAllRows(table, filter);
}

/**
* Reads all the rows from the specified table and returns a list as generic Avro records.
*
* @param table
* @param filter Optional. If specified, it will be used to restrict returned data.
* @param selectedFields Optional. If specified, it will be used to project data.
* @return
*/
List<GenericData.Record> ReadAllRows(String table, String filter, List<String> selectedFields) throws IOException {
final List<GenericData.Record> rows = new ArrayList<>();
ProcessRowsAtSnapshot(
/* table = */ table,
/* snapshotInMillis = */ null,
/* filter = */ filter,
/* selectedFields = */ selectedFields,
new AvroRowConsumer() {
@Override
public void accept(GenericData.Record record) {
Expand Down