Skip to content

Commit

Permalink
Add context and info to logging interface (#237)
Browse files Browse the repository at this point in the history
Allow the logging interface to propagate context where possible. This will let logging sinks contain richer information about potential opamp IO errors.
  • Loading branch information
jaronoff97 authored Jan 22, 2024
1 parent d0205c5 commit c8cc40b
Show file tree
Hide file tree
Showing 19 changed files with 185 additions and 150 deletions.
20 changes: 10 additions & 10 deletions client/internal/httpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (h *HTTPSender) SetRequestHeader(header http.Header) {
func (h *HTTPSender) makeOneRequestRoundtrip(ctx context.Context) {
resp, err := h.sendRequestWithRetries(ctx)
if err != nil {
h.logger.Errorf("%v", err)
h.logger.Errorf(ctx, "%v", err)
return
}
if resp == nil {
Expand All @@ -148,9 +148,9 @@ func (h *HTTPSender) sendRequestWithRetries(ctx context.Context) (*http.Response
req, err := h.prepareRequest(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
h.logger.Debugf("Client is stopped, will not try anymore.")
h.logger.Debugf(ctx, "Client is stopped, will not try anymore.")
} else {
h.logger.Errorf("Failed prepare request (%v), will not try anymore.", err)
h.logger.Errorf(ctx, "Failed prepare request (%v), will not try anymore.", err)
}
return nil, err
}
Expand Down Expand Up @@ -190,16 +190,16 @@ func (h *HTTPSender) sendRequestWithRetries(ctx context.Context) (*http.Response
return nil, fmt.Errorf("invalid response from server: %d", resp.StatusCode)
}
} else if errors.Is(err, context.Canceled) {
h.logger.Debugf("Client is stopped, will not try anymore.")
h.logger.Debugf(ctx, "Client is stopped, will not try anymore.")
return nil, err
}

h.logger.Errorf("Failed to do HTTP request (%v), will retry", err)
h.logger.Errorf(ctx, "Failed to do HTTP request (%v), will retry", err)
h.callbacks.OnConnectFailed(err)
}

case <-ctx.Done():
h.logger.Debugf("Client is stopped, will not try anymore.")
h.logger.Debugf(ctx, "Client is stopped, will not try anymore.")
return nil, ctx.Err()
}
}
Expand Down Expand Up @@ -239,11 +239,11 @@ func (h *HTTPSender) prepareRequest(ctx context.Context) (*requestWrapper, error
var buf bytes.Buffer
g := gzip.NewWriter(&buf)
if _, err = g.Write(data); err != nil {
h.logger.Errorf("Failed to compress message: %v", err)
h.logger.Errorf(ctx, "Failed to compress message: %v", err)
return nil, err
}
if err = g.Close(); err != nil {
h.logger.Errorf("Failed to close the writer: %v", err)
h.logger.Errorf(ctx, "Failed to close the writer: %v", err)
return nil, err
}
req.bodyReader = bodyReader(buf.Bytes())
Expand All @@ -262,14 +262,14 @@ func (h *HTTPSender) receiveResponse(ctx context.Context, resp *http.Response) {
msgBytes, err := io.ReadAll(resp.Body)
if err != nil {
_ = resp.Body.Close()
h.logger.Errorf("cannot read response body: %v", err)
h.logger.Errorf(ctx, "cannot read response body: %v", err)
return
}
_ = resp.Body.Close()

var response protobufs.ServerToAgent
if err := proto.Unmarshal(msgBytes, &response); err != nil {
h.logger.Errorf("cannot unmarshal response: %v", err)
h.logger.Errorf(ctx, "cannot unmarshal response: %v", err)
return
}

Expand Down
47 changes: 22 additions & 25 deletions client/internal/packagessyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,25 +101,25 @@ func (s *packagesSyncer) initStatuses() error {
func (s *packagesSyncer) doSync(ctx context.Context) {
hash, err := s.localState.AllPackagesHash()
if err != nil {
s.logger.Errorf("Package syncing failed: %V", err)
s.logger.Errorf(ctx, "Package syncing failed: %V", err)
return
}
if bytes.Compare(hash, s.available.AllPackagesHash) == 0 {
s.logger.Debugf("All packages are already up to date.")
s.logger.Debugf(ctx, "All packages are already up to date.")
return
}

failed := false
if err := s.deleteUnneededLocalPackages(); err != nil {
s.logger.Errorf("Cannot delete unneeded packages: %v", err)
if err := s.deleteUnneededLocalPackages(ctx); err != nil {
s.logger.Errorf(ctx, "Cannot delete unneeded packages: %v", err)
failed = true
}

// Iterate through offered packages and sync them all from server.
for name, pkg := range s.available.Packages {
err := s.syncPackage(ctx, name, pkg)
if err != nil {
s.logger.Errorf("Cannot sync package %s: %v", name, err)
s.logger.Errorf(ctx, "Cannot sync package %s: %v", name, err)
failed = true
}
}
Expand All @@ -128,15 +128,15 @@ func (s *packagesSyncer) doSync(ctx context.Context) {
// Update the "all" hash on success, so that next time Sync() does not thing,
// unless a new hash is received from the Server.
if err := s.localState.SetAllPackagesHash(s.available.AllPackagesHash); err != nil {
s.logger.Errorf("SetAllPackagesHash failed: %v", err)
s.logger.Errorf(ctx, "SetAllPackagesHash failed: %v", err)
} else {
s.logger.Debugf("All packages are synced and up to date.")
s.logger.Debugf(ctx, "All packages are synced and up to date.")
}
} else {
s.logger.Errorf("Package syncing was not successful.")
s.logger.Errorf(ctx, "Package syncing was not successful.")
}

_ = s.reportStatuses(true)
_ = s.reportStatuses(ctx, true)
}

// syncPackage downloads the package from the server and installs it.
Expand Down Expand Up @@ -165,7 +165,7 @@ func (s *packagesSyncer) syncPackage(
mustCreate := !pkgLocal.Exists
if pkgLocal.Exists {
if bytes.Equal(pkgLocal.Hash, pkgAvail.Hash) {
s.logger.Debugf("Package %s hash is unchanged, skipping", pkgName)
s.logger.Debugf(ctx, "Package %s hash is unchanged, skipping", pkgName)
return nil
}
if pkgLocal.Type != pkgAvail.Type {
Expand All @@ -183,7 +183,7 @@ func (s *packagesSyncer) syncPackage(

// Report that we are beginning to install it.
status.Status = protobufs.PackageStatusEnum_PackageStatusEnum_Installing
_ = s.reportStatuses(true)
_ = s.reportStatuses(ctx, true)

if mustCreate {
// Make sure the package exists.
Expand Down Expand Up @@ -213,7 +213,7 @@ func (s *packagesSyncer) syncPackage(
status.Status = protobufs.PackageStatusEnum_PackageStatusEnum_InstallFailed
status.ErrorMessage = err.Error()
}
_ = s.reportStatuses(true)
_ = s.reportStatuses(ctx, true)

return err
}
Expand All @@ -224,7 +224,7 @@ func (s *packagesSyncer) syncPackage(
func (s *packagesSyncer) syncPackageFile(
ctx context.Context, pkgName string, file *protobufs.DownloadableFile,
) error {
shouldDownload, err := s.shouldDownloadFile(pkgName, file)
shouldDownload, err := s.shouldDownloadFile(ctx, pkgName, file)
if err == nil && shouldDownload {
err = s.downloadFile(ctx, pkgName, file)
}
Expand All @@ -233,21 +233,18 @@ func (s *packagesSyncer) syncPackageFile(
}

// shouldDownloadFile returns true if the file should be downloaded.
func (s *packagesSyncer) shouldDownloadFile(
packageName string,
file *protobufs.DownloadableFile,
) (bool, error) {
func (s *packagesSyncer) shouldDownloadFile(ctx context.Context, packageName string, file *protobufs.DownloadableFile) (bool, error) {
fileContentHash, err := s.localState.FileContentHash(packageName)

if err != nil {
err := fmt.Errorf("cannot calculate checksum of %s: %v", packageName, err)
s.logger.Errorf(err.Error())
s.logger.Errorf(ctx, err.Error())
return true, nil
} else {
// Compare the checksum of the file we have with what
// we are offered by the server.
if bytes.Compare(fileContentHash, file.ContentHash) != 0 {
s.logger.Debugf("Package %s: file hash mismatch, will download.", packageName)
s.logger.Debugf(ctx, "Package %s: file hash mismatch, will download.", packageName)
return true, nil
}
}
Expand All @@ -256,7 +253,7 @@ func (s *packagesSyncer) shouldDownloadFile(

// downloadFile downloads the file from the server.
func (s *packagesSyncer) downloadFile(ctx context.Context, pkgName string, file *protobufs.DownloadableFile) error {
s.logger.Debugf("Downloading package %s file from %s", pkgName, file.DownloadUrl)
s.logger.Debugf(ctx, "Downloading package %s file from %s", pkgName, file.DownloadUrl)

req, err := http.NewRequestWithContext(ctx, "GET", file.DownloadUrl, nil)
if err != nil {
Expand Down Expand Up @@ -286,7 +283,7 @@ func (s *packagesSyncer) downloadFile(ctx context.Context, pkgName string, file
// deleteUnneededLocalPackages deletes local packages that are not
// needed anymore. This is done by comparing the local package state
// with the server's package state.
func (s *packagesSyncer) deleteUnneededLocalPackages() error {
func (s *packagesSyncer) deleteUnneededLocalPackages(ctx context.Context) error {
// Read the list of packages we have locally.
localPackages, err := s.localState.Packages()
if err != nil {
Expand All @@ -297,7 +294,7 @@ func (s *packagesSyncer) deleteUnneededLocalPackages() error {
for _, localPkg := range localPackages {
// Do we have a package that is not offered?
if _, offered := s.available.Packages[localPkg]; !offered {
s.logger.Debugf("Package %s is no longer needed, deleting.", localPkg)
s.logger.Debugf(ctx, "Package %s is no longer needed, deleting.", localPkg)
err := s.localState.DeletePackage(localPkg)
if err != nil {
lastErr = err
Expand All @@ -318,16 +315,16 @@ func (s *packagesSyncer) deleteUnneededLocalPackages() error {
// reportStatuses saves the last reported statuses to provider and client state.
// If sendImmediately is true, the statuses are scheduled to be
// sent to the server.
func (s *packagesSyncer) reportStatuses(sendImmediately bool) error {
func (s *packagesSyncer) reportStatuses(ctx context.Context, sendImmediately bool) error {
// Save it in the user-supplied state provider.
if err := s.localState.SetLastReportedStatuses(s.statuses); err != nil {
s.logger.Errorf("Cannot save last reported statuses: %v", err)
s.logger.Errorf(ctx, "Cannot save last reported statuses: %v", err)
return err
}

// Also save it in our internal state (will be needed if the Server asks for it).
if err := s.clientSyncedState.SetPackageStatuses(s.statuses); err != nil {
s.logger.Errorf("Cannot save client state: %v", err)
s.logger.Errorf(ctx, "Cannot save client state: %v", err)
return err
}
s.sender.NextMessage().Update(
Expand Down
26 changes: 13 additions & 13 deletions client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro
// If a command message exists, other messages will be ignored
return
} else {
r.logger.Debugf("Ignoring Command, agent does not have AcceptsCommands capability")
r.logger.Debugf(ctx, "Ignoring Command, agent does not have AcceptsCommands capability")
}
}

scheduled, err := r.rcvFlags(ctx, protobufs.ServerToAgentFlags(msg.Flags))
if err != nil {
r.logger.Errorf("cannot processed received flags:%v", err)
r.logger.Errorf(ctx, "cannot processed received flags:%v", err)
}

msgData := &types.MessageData{}
Expand All @@ -75,7 +75,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro
if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig) {
msgData.RemoteConfig = msg.RemoteConfig
} else {
r.logger.Debugf("Ignoring RemoteConfig, agent does not have AcceptsRemoteConfig capability")
r.logger.Debugf(ctx, "Ignoring RemoteConfig, agent does not have AcceptsRemoteConfig capability")
}
}

Expand All @@ -84,31 +84,31 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro
if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnMetrics) {
msgData.OwnMetricsConnSettings = msg.ConnectionSettings.OwnMetrics
} else {
r.logger.Debugf("Ignoring OwnMetrics, agent does not have ReportsOwnMetrics capability")
r.logger.Debugf(ctx, "Ignoring OwnMetrics, agent does not have ReportsOwnMetrics capability")
}
}

if msg.ConnectionSettings.OwnTraces != nil {
if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnTraces) {
msgData.OwnTracesConnSettings = msg.ConnectionSettings.OwnTraces
} else {
r.logger.Debugf("Ignoring OwnTraces, agent does not have ReportsOwnTraces capability")
r.logger.Debugf(ctx, "Ignoring OwnTraces, agent does not have ReportsOwnTraces capability")
}
}

if msg.ConnectionSettings.OwnLogs != nil {
if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnLogs) {
msgData.OwnLogsConnSettings = msg.ConnectionSettings.OwnLogs
} else {
r.logger.Debugf("Ignoring OwnLogs, agent does not have ReportsOwnLogs capability")
r.logger.Debugf(ctx, "Ignoring OwnLogs, agent does not have ReportsOwnLogs capability")
}
}

if msg.ConnectionSettings.OtherConnections != nil {
if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsOtherConnectionSettings) {
msgData.OtherConnSettings = msg.ConnectionSettings.OtherConnections
} else {
r.logger.Debugf("Ignoring OtherConnections, agent does not have AcceptsOtherConnectionSettings capability")
r.logger.Debugf(ctx, "Ignoring OtherConnections, agent does not have AcceptsOtherConnectionSettings capability")
}
}
}
Expand All @@ -124,7 +124,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro
r.packagesStateProvider,
)
} else {
r.logger.Debugf("Ignoring PackagesAvailable, agent does not have AcceptsPackages capability")
r.logger.Debugf(ctx, "Ignoring PackagesAvailable, agent does not have AcceptsPackages capability")
}
}

Expand Down Expand Up @@ -164,7 +164,7 @@ func (r *receivedProcessor) rcvFlags(
if flags&protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState != 0 {
cfg, err := r.callbacks.GetEffectiveConfig(ctx)
if err != nil {
r.logger.Errorf("Cannot GetEffectiveConfig: %v", err)
r.logger.Errorf(ctx, "Cannot GetEffectiveConfig: %v", err)
cfg = nil
}

Expand Down Expand Up @@ -199,25 +199,25 @@ func (r *receivedProcessor) rcvOpampConnectionSettings(ctx context.Context, sett
r.callbacks.OnOpampConnectionSettingsAccepted(settings.Opamp)
}
} else {
r.logger.Debugf("Ignoring Opamp, agent does not have AcceptsOpAMPConnectionSettings capability")
r.logger.Debugf(ctx, "Ignoring Opamp, agent does not have AcceptsOpAMPConnectionSettings capability")
}
}

func (r *receivedProcessor) processErrorResponse(body *protobufs.ServerErrorResponse) {
// TODO: implement this.
r.logger.Errorf("received an error from server: %s", body.ErrorMessage)
r.logger.Errorf(context.Background(), "received an error from server: %s", body.ErrorMessage)
}

func (r *receivedProcessor) rcvAgentIdentification(agentId *protobufs.AgentIdentification) error {
if agentId.NewInstanceUid == "" {
err := errors.New("empty instance uid is not allowed")
r.logger.Debugf(err.Error())
r.logger.Debugf(context.Background(), err.Error())
return err
}

err := r.sender.SetInstanceUid(agentId.NewInstanceUid)
if err != nil {
r.logger.Errorf("Error while setting instance uid: %v", err)
r.logger.Errorf(context.Background(), "Error while setting instance uid: %v", err)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion client/internal/wsreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ out:
var message protobufs.ServerToAgent
if err := r.receiveMessage(&message); err != nil {
if ctx.Err() == nil && !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
r.logger.Errorf("Unexpected error while receiving: %v", err)
r.logger.Errorf(ctx, "Unexpected error while receiving: %v", err)
}
break out
} else {
Expand Down
8 changes: 7 additions & 1 deletion client/internal/wsreceiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@ import (
"github.com/open-telemetry/opamp-go/protobufs"
)

var _ types.Logger = &TestLogger{}

type TestLogger struct {
*testing.T
}

func (logger TestLogger) Debugf(format string, v ...interface{}) {
func (logger TestLogger) Debugf(ctx context.Context, format string, v ...interface{}) {
logger.Logf(format, v...)
}

func (logger TestLogger) Errorf(ctx context.Context, format string, v ...interface{}) {
logger.Fatalf(format, v...)
}

type commandAction int

const (
Expand Down
Loading

0 comments on commit c8cc40b

Please sign in to comment.