Skip to content

Commit e0a88c6

Browse files
author
Luiz Pegoraro
committed
fix(agent): add more scrape info on opentelemetry to fix reset action on opentelemetry.
1 parent ea2b9ff commit e0a88c6

File tree

2 files changed

+211
-40
lines changed

2 files changed

+211
-40
lines changed

agent/backend/otel/otel.go

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/orb-community/orb/agent/otel/otlpmqttexporter"
1313
"github.com/orb-community/orb/agent/policies"
1414
"go.opentelemetry.io/collector/exporter"
15+
"go.opentelemetry.io/collector/receiver"
1516
"go.uber.org/zap"
1617
"os"
1718
"strconv"
@@ -51,6 +52,13 @@ type openTelemetryBackend struct {
5152
otelReceiverHost string
5253
otelReceiverPort int
5354
otelExecutablePath string
55+
56+
metricsReceiver receiver.Metrics
57+
metricsExporter exporter.Metrics
58+
tracesReceiver receiver.Traces
59+
tracesExporter exporter.Traces
60+
logsReceiver receiver.Logs
61+
logsExporter exporter.Logs
5462
}
5563

5664
// Configure initializes the backend with the given configuration
@@ -211,7 +219,7 @@ func (o *openTelemetryBackend) GetRunningStatus() (backend.RunningStatus, string
211219
return backend.Waiting, "opentelemetry backend is waiting for policy to come to start running", nil
212220
}
213221

214-
func (o *openTelemetryBackend) createOtlpMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (exporter.Metrics, error) {
222+
func (o *openTelemetryBackend) createOtlpMetricMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (exporter.Metrics, error) {
215223
bridgeService := otel.NewBridgeService(ctx, cancelFunc, &o.policyRepo, o.agentTags)
216224
if o.mqttClient != nil {
217225
cfg := otlpmqttexporter.CreateConfigClient(o.mqttClient, o.otlpMetricsTopic, "", bridgeService)
@@ -235,3 +243,54 @@ func (o *openTelemetryBackend) createOtlpMqttExporter(ctx context.Context, cance
235243
}
236244

237245
}
246+
247+
// TODO Add Traces on otlpmqttexporter which today only support metrics and logs
248+
func (o *openTelemetryBackend) createOtlpTraceMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (exporter.Logs, error) {
249+
bridgeService := otel.NewBridgeService(ctx, cancelFunc, &o.policyRepo, o.agentTags)
250+
if o.mqttClient != nil {
251+
cfg := otlpmqttexporter.CreateConfigClient(o.mqttClient, o.otlpTracesTopic, "", bridgeService)
252+
set := otlpmqttexporter.CreateDefaultSettings(o.logger)
253+
// Create the OTLP metrics metricsExporter that'll receive and verify the metrics produced.
254+
metricsExporter, err := otlpmqttexporter.CreateLogsExporter(ctx, set, cfg)
255+
if err != nil {
256+
return nil, err
257+
}
258+
return metricsExporter, nil
259+
} else {
260+
cfg := otlpmqttexporter.CreateConfig(o.mqttConfig.Address, o.mqttConfig.Id, o.mqttConfig.Key,
261+
o.mqttConfig.ChannelID, "", o.otlpTracesTopic, bridgeService)
262+
set := otlpmqttexporter.CreateDefaultSettings(o.logger)
263+
// Create the OTLP metrics exporter that'll receive and verify the metrics produced.
264+
metricsExporter, err := otlpmqttexporter.CreateLogsExporter(ctx, set, cfg)
265+
if err != nil {
266+
return nil, err
267+
}
268+
return metricsExporter, nil
269+
}
270+
271+
}
272+
273+
func (o *openTelemetryBackend) createOtlpLogsMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (exporter.Logs, error) {
274+
bridgeService := otel.NewBridgeService(ctx, cancelFunc, &o.policyRepo, o.agentTags)
275+
if o.mqttClient != nil {
276+
cfg := otlpmqttexporter.CreateConfigClient(o.mqttClient, o.otlpLogsTopic, "", bridgeService)
277+
set := otlpmqttexporter.CreateDefaultSettings(o.logger)
278+
// Create the OTLP metrics metricsExporter that'll receive and verify the metrics produced.
279+
exporter, err := otlpmqttexporter.CreateLogsExporter(ctx, set, cfg)
280+
if err != nil {
281+
return nil, err
282+
}
283+
return exporter, nil
284+
} else {
285+
cfg := otlpmqttexporter.CreateConfig(o.mqttConfig.Address, o.mqttConfig.Id, o.mqttConfig.Key,
286+
o.mqttConfig.ChannelID, "", o.otlpLogsTopic, bridgeService)
287+
set := otlpmqttexporter.CreateDefaultSettings(o.logger)
288+
// Create the OTLP metrics exporter that'll receive and verify the metrics produced.
289+
exporter, err := otlpmqttexporter.CreateLogsExporter(ctx, set, cfg)
290+
if err != nil {
291+
return nil, err
292+
}
293+
return exporter, nil
294+
}
295+
296+
}

agent/backend/otel/scrape.go

Lines changed: 151 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,47 +22,14 @@ func (o *openTelemetryBackend) receiveOtlp() {
2222
max := 20
2323
for {
2424
if o.mqttClient != nil {
25-
exporter, err := o.createOtlpMqttExporter(exeCtx, execCancelF)
26-
if err != nil {
27-
o.logger.Error("failed to create a exporter", zap.Error(err))
25+
if o.startOtelMetric(exeCtx, execCancelF) {
2826
return
2927
}
30-
pFactory := otlpreceiver.NewFactory()
31-
cfg := pFactory.CreateDefaultConfig()
32-
cfg.(*otlpreceiver.Config).Protocols = otlpreceiver.Protocols{
33-
GRPC: &configgrpc.GRPCServerSettings{
34-
NetAddr: confignet.NetAddr{
35-
Endpoint: o.otelReceiverHost + ":" + strconv.Itoa(o.otelReceiverPort),
36-
Transport: "tcp",
37-
},
38-
},
39-
}
40-
set := receiver.CreateSettings{
41-
TelemetrySettings: component.TelemetrySettings{
42-
Logger: o.logger,
43-
TracerProvider: trace.NewNoopTracerProvider(),
44-
MeterProvider: metric.NewMeterProvider(),
45-
ReportComponentStatus: func(*component.StatusEvent) error {
46-
return nil
47-
},
48-
},
49-
BuildInfo: component.NewDefaultBuildInfo(),
50-
}
51-
receiver, err := pFactory.CreateMetricsReceiver(exeCtx, set, cfg, exporter)
52-
if err != nil {
53-
o.logger.Error("failed to create a receiver", zap.Error(err))
54-
return
55-
}
56-
err = exporter.Start(exeCtx, nil)
57-
if err != nil {
58-
o.logger.Error("otel mqtt exporter startup error", zap.Error(err))
59-
return
60-
}
61-
o.logger.Info("Started receiver for OTLP in orb-agent",
62-
zap.String("host", o.otelReceiverHost), zap.Int("port", o.otelReceiverPort))
63-
err = receiver.Start(exeCtx, nil)
64-
if err != nil {
65-
o.logger.Error("otel receiver startup error", zap.Error(err))
28+
// TODO add this when add otlpmqttexporter to implement createTraceExporter
29+
//if o.startOtelTraces(exeCtx, execCancelF) {
30+
// return
31+
//}
32+
if o.startOtelLogs(exeCtx, execCancelF) {
6633
return
6734
}
6835
break
@@ -90,3 +57,148 @@ func (o *openTelemetryBackend) receiveOtlp() {
9057
}
9158
}()
9259
}
60+
61+
func (o *openTelemetryBackend) startOtelMetric(exeCtx context.Context, execCancelF context.CancelFunc) bool {
62+
var err error
63+
o.metricsExporter, err = o.createOtlpMetricMqttExporter(exeCtx, execCancelF)
64+
if err != nil {
65+
o.logger.Error("failed to create a exporter", zap.Error(err))
66+
return true
67+
}
68+
pFactory := otlpreceiver.NewFactory()
69+
cfg := pFactory.CreateDefaultConfig()
70+
cfg.(*otlpreceiver.Config).Protocols = otlpreceiver.Protocols{
71+
GRPC: &configgrpc.GRPCServerSettings{
72+
NetAddr: confignet.NetAddr{
73+
Endpoint: o.otelReceiverHost + ":" + strconv.Itoa(o.otelReceiverPort),
74+
Transport: "tcp",
75+
},
76+
},
77+
}
78+
set := receiver.CreateSettings{
79+
TelemetrySettings: component.TelemetrySettings{
80+
Logger: o.logger,
81+
TracerProvider: trace.NewNoopTracerProvider(),
82+
MeterProvider: metric.NewMeterProvider(),
83+
ReportComponentStatus: func(*component.StatusEvent) error {
84+
return nil
85+
},
86+
},
87+
BuildInfo: component.NewDefaultBuildInfo(),
88+
}
89+
o.metricsReceiver, err = pFactory.CreateMetricsReceiver(exeCtx, set, cfg, o.metricsExporter)
90+
if err != nil {
91+
o.logger.Error("failed to create a receiver", zap.Error(err))
92+
return true
93+
}
94+
err = o.metricsExporter.Start(exeCtx, nil)
95+
if err != nil {
96+
o.logger.Error("otel mqtt exporter startup error", zap.Error(err))
97+
return true
98+
}
99+
o.logger.Info("Started receiver for OTLP in orb-agent",
100+
zap.String("host", o.otelReceiverHost), zap.Int("port", o.otelReceiverPort))
101+
err = o.metricsReceiver.Start(exeCtx, nil)
102+
if err != nil {
103+
o.logger.Error("otel receiver startup error", zap.Error(err))
104+
return true
105+
}
106+
return false
107+
}
108+
109+
// TODO fix when create otlpmqtt trace
110+
//func (o *openTelemetryBackend) startOtelTraces(exeCtx context.Context, execCancelF context.CancelFunc) bool {
111+
// var err error
112+
// o.tracesExporter, err = o.createOtlpTraceMqttExporter(exeCtx, execCancelF)
113+
// if err != nil {
114+
// o.logger.Error("failed to create a exporter", zap.Error(err))
115+
// return true
116+
// }
117+
// pFactory := otlpreceiver.NewFactory()
118+
// cfg := pFactory.CreateDefaultConfig()
119+
// cfg.(*otlpreceiver.Config).Protocols = otlpreceiver.Protocols{
120+
// GRPC: &configgrpc.GRPCServerSettings{
121+
// NetAddr: confignet.NetAddr{
122+
// Endpoint: o.otelReceiverHost + ":" + strconv.Itoa(o.otelReceiverPort),
123+
// Transport: "tcp",
124+
// },
125+
// },
126+
// }
127+
// set := receiver.CreateSettings{
128+
// TelemetrySettings: component.TelemetrySettings{
129+
// Logger: o.logger,
130+
// TracerProvider: trace.NewNoopTracerProvider(),
131+
// MeterProvider: metric.NewMeterProvider(),
132+
// ReportComponentStatus: func(*component.StatusEvent) error {
133+
// return nil
134+
// },
135+
// },
136+
// BuildInfo: component.NewDefaultBuildInfo(),
137+
// }
138+
// o.tracesReceiver, err = pFactory.CreateTracesReceiver(exeCtx, set, cfg, o.tracesExporter)
139+
// if err != nil {
140+
// o.logger.Error("failed to create a receiver", zap.Error(err))
141+
// return true
142+
// }
143+
// err = o.metricsExporter.Start(exeCtx, nil)
144+
// if err != nil {
145+
// o.logger.Error("otel mqtt exporter startup error", zap.Error(err))
146+
// return true
147+
// }
148+
// o.logger.Info("Started receiver for OTLP in orb-agent",
149+
// zap.String("host", o.otelReceiverHost), zap.Int("port", o.otelReceiverPort))
150+
// err = o.metricsReceiver.Start(exeCtx, nil)
151+
// if err != nil {
152+
// o.logger.Error("otel receiver startup error", zap.Error(err))
153+
// return true
154+
// }
155+
// return false
156+
//}
157+
158+
func (o *openTelemetryBackend) startOtelLogs(exeCtx context.Context, execCancelF context.CancelFunc) bool {
159+
var err error
160+
o.logsExporter, err = o.createOtlpLogsMqttExporter(exeCtx, execCancelF)
161+
if err != nil {
162+
o.logger.Error("failed to create a exporter", zap.Error(err))
163+
return true
164+
}
165+
pFactory := otlpreceiver.NewFactory()
166+
cfg := pFactory.CreateDefaultConfig()
167+
cfg.(*otlpreceiver.Config).Protocols = otlpreceiver.Protocols{
168+
GRPC: &configgrpc.GRPCServerSettings{
169+
NetAddr: confignet.NetAddr{
170+
Endpoint: o.otelReceiverHost + ":" + strconv.Itoa(o.otelReceiverPort),
171+
Transport: "tcp",
172+
},
173+
},
174+
}
175+
set := receiver.CreateSettings{
176+
TelemetrySettings: component.TelemetrySettings{
177+
Logger: o.logger,
178+
TracerProvider: trace.NewNoopTracerProvider(),
179+
MeterProvider: metric.NewMeterProvider(),
180+
ReportComponentStatus: func(*component.StatusEvent) error {
181+
return nil
182+
},
183+
},
184+
BuildInfo: component.NewDefaultBuildInfo(),
185+
}
186+
o.metricsReceiver, err = pFactory.CreateLogsReceiver(exeCtx, set, cfg, o.logsExporter)
187+
if err != nil {
188+
o.logger.Error("failed to create a receiver", zap.Error(err))
189+
return true
190+
}
191+
err = o.metricsExporter.Start(exeCtx, nil)
192+
if err != nil {
193+
o.logger.Error("otel mqtt exporter startup error", zap.Error(err))
194+
return true
195+
}
196+
o.logger.Info("Started receiver for OTLP in orb-agent",
197+
zap.String("host", o.otelReceiverHost), zap.Int("port", o.otelReceiverPort))
198+
err = o.metricsReceiver.Start(exeCtx, nil)
199+
if err != nil {
200+
o.logger.Error("otel receiver startup error", zap.Error(err))
201+
return true
202+
}
203+
return false
204+
}

0 commit comments

Comments
 (0)