-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Description
Summary
We need to switch some things over to the stream writer APIs, and it mostly looks great! The only issue is it's awkward to model rows as protobufs in some places, but would benefit from streams.
Thankfully there are examples of using JSON in both the tests and documentation — but they make surprising decisions w.r.t TIMESTAMP
columns, which specifically go against similar decisions made by bigquery.InferSchema
.
Converting a bigquery.Schema
-> protoreflect.MessageDescriptor
models TIMESTAMP
with descriptorpb.FieldDescriptorProto_TYPE_INT64
. I would expect this to be a timestamppb.Timestamp
message.
See the team's recommended example in the integration tests:
google-cloud-go/bigquery/storage/managedwriter/integration_test.go
Lines 426 to 437 in 7a46b54
message := dynamicpb.NewMessage(md) | |
// First, json->proto message | |
err = protojson.Unmarshal(v, message) | |
if err != nil { | |
t.Fatalf("failed to Unmarshal json message for row %d: %v", k, err) | |
} | |
// Then, proto message -> bytes. | |
b, err := proto.Marshal(message) | |
if err != nil { | |
t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err) | |
} |
Because of this decision, protojson.Unmarshal(b, msg)
fails to parse RFC3339 timestamps as it expects an INT64
. This would work if it was a timestamppb.Timestamp
.
Repro
package main
import (
"encoding/json"
"fmt"
"time"
"cloud.google.com/go/bigquery"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
func main() {
type Row struct {
Inserted time.Time `bigquery:"inserted" json:"inserted"`
Msg string `bigquery:"msg" json:"msg"`
}
schema, err := bigquery.InferSchema(Row{})
if err != nil {
panic(fmt.Errorf("inferring bq schema: %w", err))
}
schemaJSON, err := schema.ToJSONFields()
if err != nil {
panic(fmt.Errorf("encoding schema: %w", err))
}
fmt.Printf("schema: %s\n", string(schemaJSON))
msgDesc, _, err := dynamicTableDescriptor(schema)
if err != nil {
panic(err)
}
msg := dynamicpb.NewMessage(msgDesc)
if _, err = dynamicProtobuf(Row{time.Now(), "hello"}, msg); err != nil {
panic(fmt.Errorf("encoding row: %w", err))
}
}
func dynamicTableDescriptor(schema bigquery.Schema) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto, error) {
convertedSchema, err := adapt.BQSchemaToStorageTableSchema(schema)
if err != nil {
return nil, nil, fmt.Errorf("converting schema to protobuf: %w", err)
}
descriptor, err := adapt.StorageSchemaToProto2Descriptor(convertedSchema, "test")
if err != nil {
return nil, nil, fmt.Errorf("adapting converted schema to descriptor: %w", err)
}
messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor)
if !ok {
return nil, nil, fmt.Errorf("adapted descriptor is not a message descriptor: %T", descriptor)
}
dp, err := adapt.NormalizeDescriptor(messageDescriptor)
if err != nil {
return nil, nil, fmt.Errorf("normalizing descriptor: %w", err)
}
return messageDescriptor, dp, nil
}
func dynamicProtobuf[T any](row T, msg *dynamicpb.Message) ([]byte, error) {
buf, err := json.Marshal(row)
if err != nil {
return nil, fmt.Errorf("failed json encoding: %w", err)
}
if err := protojson.Unmarshal(buf, msg); err != nil {
return nil, fmt.Errorf("failed protojson decoding: %w", err)
}
opts := proto.MarshalOptions{}
// [proto.Size] is expensive to call, and we know that the binary encoding will
// generally be smaller than the JSON encoding; buf should already be big enough.
// Otherwise, it will allocate.
buf, err = opts.MarshalAppend(buf[:0], msg)
if err != nil {
return nil, fmt.Errorf("failed protobuf encoding: %w", err)
}
return buf, nil
}
Output:
schema: [
{
"mode": "REQUIRED",
"name": "inserted",
"type": "TIMESTAMP"
},
{
"mode": "REQUIRED",
"name": "msg",
"type": "STRING"
}
]
panic: encoding row: failed protojson decoding: proto: (line 1:13): invalid value for int64 field inserted: "2025-07-15T17:50:04.252229-04:00"
go.mod
module scratch
go 1.24.2
require (
cloud.google.com/go/bigquery v1.69.0
google.golang.org/protobuf v1.36.6
)
require (
cloud.google.com/go v0.121.0 // indirect
cloud.google.com/go/auth v0.16.1 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
cloud.google.com/go/compute/metadata v0.6.0 // indirect
cloud.google.com/go/iam v1.5.2 // indirect
github.com/apache/arrow/go/v15 v15.0.2 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/google/s2a-go v0.1.9 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
github.com/googleapis/gax-go/v2 v2.14.1 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
go.opentelemetry.io/otel v1.35.0 // indirect
go.opentelemetry.io/otel/metric v1.35.0 // indirect
go.opentelemetry.io/otel/trace v1.35.0 // indirect
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/mod v0.23.0 // indirect
golang.org/x/net v0.39.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sync v0.14.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/text v0.24.0 // indirect
golang.org/x/time v0.11.0 // indirect
golang.org/x/tools v0.30.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/api v0.232.0 // indirect
google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2 // indirect
google.golang.org/grpc v1.72.0 // indirect
)