Skip to content

Commit 29ffa42

Browse files
committed
[receiver/googlecloudpubsub] Add support for encoding extensions (open-telemetry#37109)
Added support for encoding extensions. Setting the encoding field in the config now references the extension. If it didn't find the extension it will fall back to searching the internal encoders. To make the build in encoders consistent with the extensions they now have the same interface. The README is adapted accordingly.
1 parent 2956bb9 commit 29ffa42

11 files changed

+415
-200
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
change_type: enhancement
2+
3+
component: googlecloudpubsubreceiver
4+
5+
note: Added support for encoding extensions.
6+
7+
issues: [37109]
8+
9+
subtext:
10+
11+
change_logs: [user]

receiver/googlecloudpubsubreceiver/README.md

+39-15
Original file line numberDiff line numberDiff line change
@@ -41,29 +41,53 @@ receivers:
4141
4242
## Encoding
4343
44-
You should not need to set the encoding of the subscription as the receiver will try to discover the type of the data
45-
by looking at the `ce-type` and `content-type` attributes of the message. Only when those attributes are not set
46-
must the `encoding` field in the configuration be set.
44+
The `encoding` options allows you to specify Encoding Extensions for decoding messages on the subscription. An
45+
extension need to be configured in the `extensions` section, and added to pipeline in the collectors configuration file.
4746

48-
| ce-type | ce-datacontenttype | encoding | description |
49-
|-----------------------------------|----------------------|-------------------|------------------------------------------------|
50-
| org.opentelemetry.otlp.traces.v1 | application/protobuf | | Decode OTLP trace message |
51-
| org.opentelemetry.otlp.metrics.v1 | application/protobuf | | Decode OTLP metric message |
52-
| org.opentelemetry.otlp.logs.v1 | application/json | | Decode OTLP log message |
53-
| - | - | otlp_proto_trace | Decode OTLP trace message |
54-
| - | - | otlp_proto_metric | Decode OTLP trace message |
55-
| - | - | otlp_proto_log | Decode OTLP trace message |
56-
| - | - | cloud_logging | Decode [Cloud Logging] [LogEntry] message type |
57-
| - | - | raw_text | Wrap in an OTLP log message |
47+
The following example shows how to use the text encoding extension for ingesting arbitrary text message on a
48+
subscription, wrapping them in OTLP Log messages. Note that not all extensions support all signals.
49+
50+
```yaml
51+
extensions:
52+
text_encoding:
53+
encoding: utf8
54+
unmarshaling_separator: "\r?\n"
55+
56+
service:
57+
extensions: [text_encoding]
58+
pipelines:
59+
logs:
60+
receivers: [googlecloudpubsub]
61+
processors: []
62+
exporters: [debug]
63+
```
5864

59-
When the `encoding` configuration is set, the attributes on the message are ignored.
65+
The receiver also supports build in encodings for the native OTLP encodings, without the need to specify an Encoding
66+
Extensions. The non OTLP build in encodings will be deprecated as soon as extensions for the formats are available.
67+
68+
| encoding | description |
69+
|-------------------|------------------------------------------------|
70+
| otlp_proto_trace | Decode OTLP trace message |
71+
| otlp_proto_metric | Decode OTLP trace message |
72+
| otlp_proto_log | Decode OTLP trace message |
73+
| cloud_logging | Decode [Cloud Logging] [LogEntry] message type |
74+
| raw_text | Wrap in an OTLP log message |
6075

6176
With `cloud_logging`, the receiver can be used to bring Cloud Logging messages into an OpenTelemetry pipeline. You'll
6277
first need to [set up a logging sink][sink-docs] with a Pub/Sub topic as its destination. Note that the `cloud_logging`
6378
integration is considered **alpha** as the semantic convention on some of the conversion are not stabilized yet.
6479

6580
With `raw_text`, the receiver can be used for ingesting arbitrary text message on a Pubsub subscription, wrapping them
66-
in OTLP Log messages, making it a convenient way to ingest raw log lines from Pubsub.
81+
in OTLP Log messages.
82+
83+
When no encoding is specified, the receiver will try to discover the type of the data by looking at the `ce-type` and
84+
`content-type` attributes of the message. These message attributes are set by the `googlepubsubexporter`.
85+
86+
| ce-type | ce-datacontenttype | encoding | description |
87+
|-----------------------------------|----------------------|-------------------|------------------------------------------------|
88+
| org.opentelemetry.otlp.traces.v1 | application/protobuf | | Decode OTLP trace message |
89+
| org.opentelemetry.otlp.metrics.v1 | application/protobuf | | Decode OTLP metric message |
90+
| org.opentelemetry.otlp.logs.v1 | application/protobuf | | Decode OTLP log message |
6791

6892
[Cloud Logging]: https://cloud.google.com/logging
6993
[LogEntry]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry

receiver/googlecloudpubsubreceiver/config.go

-45
Original file line numberDiff line numberDiff line change
@@ -35,51 +35,6 @@ type Config struct {
3535
ClientID string `mapstructure:"client_id"`
3636
}
3737

38-
func (config *Config) validateForLog() error {
39-
err := config.validate()
40-
if err != nil {
41-
return err
42-
}
43-
switch config.Encoding {
44-
case "":
45-
case "otlp_proto_log":
46-
case "raw_text":
47-
case "raw_json":
48-
case "cloud_logging":
49-
default:
50-
return fmt.Errorf("log encoding %v is not supported. supported encoding formats include [otlp_proto_log,raw_text,raw_json,cloud_logging]", config.Encoding)
51-
}
52-
return nil
53-
}
54-
55-
func (config *Config) validateForTrace() error {
56-
err := config.validate()
57-
if err != nil {
58-
return err
59-
}
60-
switch config.Encoding {
61-
case "":
62-
case "otlp_proto_trace":
63-
default:
64-
return fmt.Errorf("trace encoding %v is not supported. supported encoding formats include [otlp_proto_trace]", config.Encoding)
65-
}
66-
return nil
67-
}
68-
69-
func (config *Config) validateForMetric() error {
70-
err := config.validate()
71-
if err != nil {
72-
return err
73-
}
74-
switch config.Encoding {
75-
case "":
76-
case "otlp_proto_metric":
77-
default:
78-
return fmt.Errorf("metric encoding %v is not supported. supported encoding formats include [otlp_proto_metric]", config.Encoding)
79-
}
80-
return nil
81-
}
82-
8338
func (config *Config) validate() error {
8439
if !subscriptionMatcher.MatchString(config.Subscription) {
8540
return fmt.Errorf("subscription '%s' is not a valid format, use 'projects/<project_id>/subscriptions/<name>'", config.Subscription)

receiver/googlecloudpubsubreceiver/config_test.go

-60
Original file line numberDiff line numberDiff line change
@@ -63,70 +63,10 @@ func TestLoadConfig(t *testing.T) {
6363
func TestConfigValidation(t *testing.T) {
6464
factory := NewFactory()
6565
c := factory.CreateDefaultConfig().(*Config)
66-
assert.Error(t, c.validateForTrace())
67-
assert.Error(t, c.validateForLog())
68-
assert.Error(t, c.validateForMetric())
6966
c.Subscription = "projects/000project/subscriptions/my-subscription"
7067
assert.Error(t, c.validate())
7168
c.Subscription = "projects/my-project/topics/my-topic"
7269
assert.Error(t, c.validate())
7370
c.Subscription = "projects/my-project/subscriptions/my-subscription"
7471
assert.NoError(t, c.validate())
7572
}
76-
77-
func TestTraceConfigValidation(t *testing.T) {
78-
factory := NewFactory()
79-
c := factory.CreateDefaultConfig().(*Config)
80-
c.Subscription = "projects/my-project/subscriptions/my-subscription"
81-
assert.NoError(t, c.validateForTrace())
82-
83-
c.Encoding = "otlp_proto_metric"
84-
assert.Error(t, c.validateForTrace())
85-
c.Encoding = "otlp_proto_log"
86-
assert.Error(t, c.validateForTrace())
87-
c.Encoding = "raw_text"
88-
assert.Error(t, c.validateForTrace())
89-
c.Encoding = "raw_json"
90-
assert.Error(t, c.validateForTrace())
91-
92-
c.Encoding = "otlp_proto_trace"
93-
assert.NoError(t, c.validateForTrace())
94-
}
95-
96-
func TestMetricConfigValidation(t *testing.T) {
97-
factory := NewFactory()
98-
c := factory.CreateDefaultConfig().(*Config)
99-
c.Subscription = "projects/my-project/subscriptions/my-subscription"
100-
assert.NoError(t, c.validateForMetric())
101-
102-
c.Encoding = "otlp_proto_trace"
103-
assert.Error(t, c.validateForMetric())
104-
c.Encoding = "otlp_proto_log"
105-
assert.Error(t, c.validateForMetric())
106-
c.Encoding = "raw_text"
107-
assert.Error(t, c.validateForMetric())
108-
c.Encoding = "raw_json"
109-
assert.Error(t, c.validateForMetric())
110-
111-
c.Encoding = "otlp_proto_metric"
112-
assert.NoError(t, c.validateForMetric())
113-
}
114-
115-
func TestLogConfigValidation(t *testing.T) {
116-
factory := NewFactory()
117-
c := factory.CreateDefaultConfig().(*Config)
118-
c.Subscription = "projects/my-project/subscriptions/my-subscription"
119-
assert.NoError(t, c.validateForLog())
120-
121-
c.Encoding = "otlp_proto_trace"
122-
assert.Error(t, c.validateForLog())
123-
c.Encoding = "otlp_proto_metric"
124-
assert.Error(t, c.validateForLog())
125-
126-
c.Encoding = "raw_text"
127-
assert.NoError(t, c.validateForLog())
128-
c.Encoding = "raw_json"
129-
assert.NoError(t, c.validateForLog())
130-
c.Encoding = "otlp_proto_log"
131-
assert.NoError(t, c.validateForLog())
132-
}

receiver/googlecloudpubsubreceiver/factory.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (factory *pubsubReceiverFactory) CreateTraces(
7171
cfg component.Config,
7272
consumer consumer.Traces,
7373
) (receiver.Traces, error) {
74-
err := cfg.(*Config).validateForTrace()
74+
err := cfg.(*Config).validate()
7575
if err != nil {
7676
return nil, err
7777
}
@@ -89,7 +89,7 @@ func (factory *pubsubReceiverFactory) CreateMetrics(
8989
cfg component.Config,
9090
consumer consumer.Metrics,
9191
) (receiver.Metrics, error) {
92-
err := cfg.(*Config).validateForMetric()
92+
err := cfg.(*Config).validate()
9393
if err != nil {
9494
return nil, err
9595
}
@@ -107,7 +107,7 @@ func (factory *pubsubReceiverFactory) CreateLogs(
107107
cfg component.Config,
108108
consumer consumer.Logs,
109109
) (receiver.Logs, error) {
110-
err := cfg.(*Config).validateForLog()
110+
err := cfg.(*Config).validate()
111111
if err != nil {
112112
return nil, err
113113
}

receiver/googlecloudpubsubreceiver/go.mod

+5-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/google/go-cmp v0.6.0
99
github.com/iancoleman/strcase v0.3.0
1010
github.com/json-iterator/go v1.1.12
11+
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.114.0
1112
github.com/stretchr/testify v1.10.0
1213
go.opentelemetry.io/collector/component v0.117.0
1314
go.opentelemetry.io/collector/component/componenttest v0.117.0
@@ -36,7 +37,7 @@ require (
3637
cloud.google.com/go/iam v1.2.2 // indirect
3738
cloud.google.com/go/longrunning v0.6.2 // indirect
3839
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
39-
github.com/davecgh/go-spew v1.1.1 // indirect
40+
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
4041
github.com/felixge/httpsnoop v1.0.4 // indirect
4142
github.com/go-logr/logr v1.4.2 // indirect
4243
github.com/go-logr/stdr v1.2.2 // indirect
@@ -55,7 +56,7 @@ require (
5556
github.com/mitchellh/reflectwalk v1.0.2 // indirect
5657
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
5758
github.com/modern-go/reflect2 v1.0.2 // indirect
58-
github.com/pmezard/go-difflib v1.0.0 // indirect
59+
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
5960
github.com/rogpeppe/go-internal v1.12.0 // indirect
6061
go.einride.tech/aip v0.68.0 // indirect
6162
go.opencensus.io v0.24.0 // indirect
@@ -92,3 +93,5 @@ retract (
9293
v0.76.1
9394
v0.65.0
9495
)
96+
97+
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding

receiver/googlecloudpubsubreceiver/go.sum

+4-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/googlecloudpubsubreceiver/internal/log_entry.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package internal // import "github.com/open-telemetry/opentelemetry-collector-co
55

66
import (
77
"bytes"
8-
"context"
98
"encoding/hex"
109
stdjson "encoding/json"
1110
"errors"
@@ -20,7 +19,6 @@ import (
2019
jsoniter "github.com/json-iterator/go"
2120
"go.opentelemetry.io/collector/pdata/pcommon"
2221
"go.opentelemetry.io/collector/pdata/plog"
23-
"go.uber.org/zap"
2422
"google.golang.org/genproto/googleapis/api/monitoredres"
2523
"google.golang.org/protobuf/encoding/protojson"
2624
"google.golang.org/protobuf/reflect/protoreflect"
@@ -113,7 +111,7 @@ func getLogEntryDescriptor() protoreflect.MessageDescriptor {
113111
// schema; this ensures that a numeric value in the input is correctly
114112
// translated to either an integer or a double in the output. It falls back to
115113
// plain JSON decoding if payload type is not available in the proto registry.
116-
func TranslateLogEntry(_ context.Context, _ *zap.Logger, data []byte) (pcommon.Resource, plog.LogRecord, error) {
114+
func TranslateLogEntry(data []byte) (pcommon.Resource, plog.LogRecord, error) {
117115
lr := plog.NewLogRecord()
118116
res := pcommon.NewResource()
119117

receiver/googlecloudpubsubreceiver/internal/log_entry_test.go

+1-6
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package internal
55

66
import (
7-
"context"
87
"fmt"
98
"testing"
109
"time"
@@ -15,7 +14,6 @@ import (
1514
"go.opentelemetry.io/collector/pdata/pcommon"
1615
"go.opentelemetry.io/collector/pdata/plog"
1716
"go.uber.org/multierr"
18-
"go.uber.org/zap"
1917
)
2018

2119
type Log struct {
@@ -70,15 +68,12 @@ func TestTranslateLogEntry(t *testing.T) {
7068
}{
7169
// TODO: Add publicly shareable log test data.
7270
}
73-
74-
logger, _ := zap.NewDevelopment()
75-
7671
for _, tt := range tests {
7772
var errs error
7873
wantRes, wantLr, err := generateLog(t, tt.want)
7974
errs = multierr.Append(errs, err)
8075

81-
gotRes, gotLr, err := TranslateLogEntry(context.TODO(), logger, []byte(tt.input))
76+
gotRes, gotLr, err := TranslateLogEntry([]byte(tt.input))
8277
errs = multierr.Append(errs, err)
8378
errs = multierr.Combine(errs, compareResources(wantRes, gotRes), compareLogRecords(wantLr, gotLr))
8479

0 commit comments

Comments
 (0)