Skip to content

Commit

Permalink
add stream list concurrency config
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Aug 18, 2023
1 parent b67c272 commit 04b5ad1
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 2 deletions.
1 change: 1 addition & 0 deletions cmd/index/job/correction/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ corrector:
agent_dns: vald-agent-ngt.default.svc.cluster.local
agent_namespace: "default"
node_name: ""
stream_list_concurrency: 100
discoverer:
duration: 500ms
client:
Expand Down
12 changes: 12 additions & 0 deletions internal/config/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ type Corrector struct {
// NodeName represents node name
NodeName string `json:"node_name" yaml:"node_name"`

// StreamConcurrency represent stream concurrency for StreamListObject rpc client
// this directly affects the memory usage of this job
StreamListConcurrency int `json:"stream_list_concurrency" yaml:"stream_list_concurrency"`

// Discoverer represent agent discoverer service configuration
Discoverer *DiscovererClient `json:"discoverer" yaml:"discoverer"`
}
Expand All @@ -52,3 +56,11 @@ func (c *Corrector) Bind() *Corrector {
}
return c

Check warning on line 57 in internal/config/corrector.go

View check run for this annotation

Codecov / codecov/patch

internal/config/corrector.go#L48-L57

Added lines #L48 - L57 were not covered by tests
}

// GetStreamListConcurrency returns the StreamListConcurrency field value if set, -1 otherwise, which means no limit.
func (c *Corrector) GetStreamListConcurrency() int {
if c != nil {
return c.StreamListConcurrency
}
return -1

Check warning on line 65 in internal/config/corrector.go

View check run for this annotation

Codecov / codecov/patch

internal/config/corrector.go#L61-L65

Added lines #L61 - L65 were not covered by tests
}
7 changes: 5 additions & 2 deletions pkg/index/job/correction/service/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ func (c *correct) correct(ctx context.Context, addrs []string) (err error) {
}

seg, ctx := stdeg.WithContext(ctx)
seg.SetLimit(c.cfg.Server.GetGRPCStreamConcurrency()) // FIXME: server settingsをそのまま流用で良いのか?
concurrency := c.cfg.Corrector.GetStreamListConcurrency()
seg.SetLimit(concurrency) // FIXME: server settingsをそのまま流用で良いのか?

log.Infof("starting correction for agent %s, concurrency: %d", addr, concurrency)

finalize := func() error {
err = seg.Wait()
Expand Down Expand Up @@ -205,7 +208,7 @@ func (c *correct) correct2(ctx context.Context, addrs []string) (err error) {
if err := c.discoverer.GetClient().OrderedRange(ctx, addrs,
func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error {
eg, ctx := errgroup.New(ctx)
eg.Limitation(c.cfg.Server.GetGRPCStreamConcurrency())
eg.Limitation(c.cfg.Corrector.GetStreamListConcurrency())

vc := vald.NewValdClient(conn)
stream, err := vc.StreamListObject(ctx, &payload.Object_List_Request{})
Expand Down

0 comments on commit 04b5ad1

Please sign in to comment.