Skip to content

Commit d3bc722

Browse files
authored
Metrics and updated logging for importer / exporter semaphores (#444)
fix: add errors.go for classification of errors fix: add logging to support semaphore debugging and producer/consumer metrics for secrets fix: incorporate use of ClassifyVaultError fix: update exporter to support debugging logging and metrics for semaphore operations test: new importer_test --------- Signed-off-by: Ben Stickel <[email protected]>
1 parent 680ea48 commit d3bc722

File tree

5 files changed

+763
-4
lines changed

5 files changed

+763
-4
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package operation
19+
20+
import "strings"
21+
22+
// ClassifyVaultError categorizes Vault operation errors by type for debugging.
23+
func ClassifyVaultError(err error) string {
24+
if err == nil {
25+
return "unknown"
26+
}
27+
28+
errMsg := strings.ToLower(err.Error())
29+
switch {
30+
case strings.Contains(errMsg, "connection"):
31+
return "connection"
32+
case strings.Contains(errMsg, "permission"), strings.Contains(errMsg, "denied"):
33+
return "permission"
34+
case strings.Contains(errMsg, "timeout"):
35+
return "timeout"
36+
default:
37+
return "unknown"
38+
}
39+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package operation
19+
20+
import (
21+
"errors"
22+
"testing"
23+
)
24+
25+
func Test_ClassifyVaultError(t *testing.T) {
26+
tests := []struct {
27+
name string
28+
err error
29+
want string
30+
}{
31+
{
32+
name: "nil error",
33+
err: nil,
34+
want: "unknown",
35+
},
36+
{
37+
name: "connection error",
38+
err: errors.New("connection refused"),
39+
want: "connection",
40+
},
41+
{
42+
name: "connection error with context",
43+
err: errors.New("vault: connection timeout after 30s"),
44+
want: "connection",
45+
},
46+
{
47+
name: "permission error",
48+
err: errors.New("permission denied"),
49+
want: "permission",
50+
},
51+
{
52+
name: "permission error with vault prefix",
53+
err: errors.New("vault: permission denied for path secret/data/foo"),
54+
want: "permission",
55+
},
56+
{
57+
name: "access denied error",
58+
err: errors.New("access denied"),
59+
want: "permission",
60+
},
61+
{
62+
name: "denied error",
63+
err: errors.New("request denied by policy"),
64+
want: "permission",
65+
},
66+
{
67+
name: "timeout error",
68+
err: errors.New("operation timeout"),
69+
want: "timeout",
70+
},
71+
{
72+
name: "timeout error with context",
73+
err: errors.New("vault: timeout waiting for response"),
74+
want: "timeout",
75+
},
76+
{
77+
name: "unknown error",
78+
err: errors.New("some random error"),
79+
want: "unknown",
80+
},
81+
{
82+
name: "not found error",
83+
err: errors.New("path not found"),
84+
want: "unknown",
85+
},
86+
{
87+
name: "empty error message",
88+
err: errors.New(""),
89+
want: "unknown",
90+
},
91+
{
92+
name: "mixed case connection error",
93+
err: errors.New("Connection Failed"),
94+
want: "connection",
95+
},
96+
{
97+
name: "mixed case permission error",
98+
err: errors.New("Permission Denied"),
99+
want: "permission",
100+
},
101+
{
102+
name: "mixed case timeout error",
103+
err: errors.New("Timeout Exceeded"),
104+
want: "timeout",
105+
},
106+
}
107+
for _, tt := range tests {
108+
t.Run(tt.name, func(t *testing.T) {
109+
got := ClassifyVaultError(tt.err)
110+
if got != tt.want {
111+
t.Errorf("ClassifyVaultError() = %v, want %v", got, tt.want)
112+
}
113+
})
114+
}
115+
}

pkg/bundle/vault/internal/operation/exporter.go

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ import (
2626
"path"
2727
"strconv"
2828
"strings"
29+
"sync"
30+
"sync/atomic"
31+
"time"
2932

3033
"github.com/imdario/mergo"
3134
"go.uber.org/zap"
@@ -41,6 +44,10 @@ import (
4144
"golang.org/x/sync/semaphore"
4245
)
4346

47+
const (
48+
contextErrorNone = "none"
49+
)
50+
4451
// Exporter initialize a secret exporter operation
4552
func Exporter(service kv.Service, backendPath string, output chan *bundlev1.Package, withMetadata bool, maxWorkerCount int64) Operation {
4653
return &exporter{
@@ -60,12 +67,22 @@ type exporter struct {
6067
withMetadata bool
6168
output chan *bundlev1.Package
6269
maxWorkerCount int64
70+
pathCount int
71+
pathCountMutex sync.RWMutex
6372
}
6473

6574
// Run the implemented operation
6675
//
6776
//nolint:funlen,gocognit,gocyclo // refactor
6877
func (op *exporter) Run(ctx context.Context) error {
78+
startTime := time.Now()
79+
80+
log.For(ctx).Info("Starting vault export operation",
81+
zap.String("path", op.path),
82+
zap.Int64("max_workers", op.maxWorkerCount),
83+
zap.Bool("with_metadata", op.withMetadata),
84+
)
85+
6986
// Initialize sub context
7087
g, gctx := errgroup.WithContext(ctx)
7188

@@ -87,17 +104,48 @@ func (op *exporter) Run(ctx context.Context) error {
87104
// Reader errGroup
88105
gReader, gReaderCtx := errgroup.WithContext(gctx)
89106

107+
var processedCount atomic.Int64
90108
// Listen for message
91109
for secretPath := range pathChan {
92110
secPath := secretPath
93111

94112
if err := gReaderCtx.Err(); err != nil {
95-
// Stop processing
113+
log.For(gReaderCtx).Error("Context error detected, stopping processing",
114+
zap.Error(err),
115+
zap.Int64("processed_count", processedCount.Load()),
116+
)
96117
break
97118
}
98119

120+
log.For(gReaderCtx).Debug("Attempting semaphore acquisition",
121+
zap.String("secret_path", secretPath),
122+
zap.Int64("max_workers", op.maxWorkerCount),
123+
zap.Bool("context_done", gReaderCtx.Done() != nil),
124+
zap.String("context_error", func() string {
125+
if err := gReaderCtx.Err(); err != nil {
126+
return err.Error()
127+
}
128+
return contextErrorNone
129+
}()),
130+
)
131+
99132
// Acquire a token
100133
if err := sem.Acquire(gReaderCtx, 1); err != nil {
134+
log.For(gReaderCtx).Error("Semaphore acquisition failed",
135+
zap.Error(err),
136+
zap.String("secret_path", secretPath),
137+
zap.String("path", op.path),
138+
zap.Int64("max_workers", op.maxWorkerCount),
139+
zap.String("context_error", func() string {
140+
if ctxErr := gReaderCtx.Err(); ctxErr != nil {
141+
return ctxErr.Error()
142+
}
143+
return contextErrorNone
144+
}()),
145+
zap.Bool("is_context_canceled", errors.Is(gReaderCtx.Err(), context.Canceled)),
146+
zap.Bool("is_deadline_exceeded", errors.Is(gReaderCtx.Err(), context.DeadlineExceeded)),
147+
)
148+
101149
return fmt.Errorf("unable to acquire a semaphore token: %w", err)
102150
}
103151

@@ -107,6 +155,7 @@ func (op *exporter) Run(ctx context.Context) error {
107155
gReader.Go(func() error {
108156
// Release token on finish
109157
defer sem.Release(1)
158+
defer processedCount.Add(1)
110159

111160
if err := gReaderCtx.Err(); err != nil {
112161
// Context has already an error
@@ -127,6 +176,17 @@ func (op *exporter) Run(ctx context.Context) error {
127176
log.For(gReaderCtx).Debug("No data / path found for given path", zap.String("path", secPath))
128177
return nil
129178
}
179+
180+
// Classify error types for better debugging
181+
errorType := ClassifyVaultError(errRead)
182+
183+
log.For(gReaderCtx).Error("Failed to read secret from Vault",
184+
zap.String("secret_path", vaultPackagePath),
185+
zap.Uint32("version", vaultVersion),
186+
zap.String("error_type", errorType),
187+
zap.Error(errRead),
188+
)
189+
130190
return fmt.Errorf("unexpected vault error: %w", errRead)
131191
}
132192

@@ -266,22 +326,58 @@ func (op *exporter) Run(ctx context.Context) error {
266326
})
267327
}
268328

269-
return gReader.Wait()
329+
// Wait for all readers to complete
330+
if err := gReader.Wait(); err != nil {
331+
return err
332+
}
333+
334+
log.For(gctx).Info("Secret reader consumer completed",
335+
zap.Int64("total_processed", processedCount.Load()),
336+
)
337+
338+
return nil
270339
})
271340

272341
// Producers ---------------------------------------------------------------
273342

274343
// Vault crawler
275344
g.Go(func() error {
276345
defer close(pathChan)
277-
return op.walk(gctx, op.path, op.path, pathChan)
346+
347+
log.For(gctx).Info("Starting path producer (vault crawler)",
348+
zap.String("base_path", op.path),
349+
)
350+
351+
producerStartTime := time.Now()
352+
353+
// Walk the vault path tree
354+
if err := op.walk(gctx, op.path, op.path, pathChan); err != nil {
355+
return err
356+
}
357+
358+
op.pathCountMutex.RLock()
359+
totalPaths := op.pathCount
360+
op.pathCountMutex.RUnlock()
361+
362+
log.For(gctx).Info("Path producer completed",
363+
zap.Int("total_paths_published", totalPaths),
364+
zap.Duration("total_time", time.Since(producerStartTime)),
365+
zap.Float64("avg_rate_per_sec", float64(totalPaths)/time.Since(producerStartTime).Seconds()),
366+
)
367+
368+
return nil
278369
})
279370

280371
// Wait for all goroutime to complete
281372
if err := g.Wait(); err != nil {
282373
return fmt.Errorf("vault operation error: %w", err)
283374
}
284375

376+
log.For(ctx).Info("Vault export operation completed",
377+
zap.Duration("total_duration", time.Since(startTime)),
378+
zap.String("path", op.path),
379+
)
380+
285381
// No error
286382
return nil
287383
}
@@ -299,8 +395,24 @@ func (op *exporter) walk(ctx context.Context, basePath, currPath string, keys ch
299395
if res == nil {
300396
select {
301397
case <-ctx.Done():
398+
log.For(ctx).Warn("Path producer context canceled",
399+
zap.Error(ctx.Err()),
400+
zap.Int("published_count", op.pathCount),
401+
)
302402
return ctx.Err()
303403
case keys <- currPath:
404+
op.pathCountMutex.Lock()
405+
op.pathCount++
406+
currentCount := op.pathCount
407+
op.pathCountMutex.Unlock()
408+
409+
// Log progress periodically
410+
if currentCount%50 == 0 {
411+
log.For(ctx).Debug("Path producer progress",
412+
zap.Int("paths_published", currentCount),
413+
zap.String("latest_path", currPath),
414+
)
415+
}
304416
}
305417
return nil
306418
}

0 commit comments

Comments
 (0)