Skip to content

Commit 85b6b27

Browse files
committed
refactor: move state.State to public package
Export the state struct from the internal package into a public package. It is a dependency of the `ClusterServer` GRPC service - so to be able to import and use it, the state also needs to be exported. Signed-off-by: Utku Ozdemir <[email protected]>
1 parent a2217bd commit 85b6b27

File tree

13 files changed

+273
-240
lines changed

13 files changed

+273
-240
lines changed

cmd/discovery-service/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ import (
3737
"github.com/siderolabs/discovery-service/internal/landing"
3838
"github.com/siderolabs/discovery-service/internal/limiter"
3939
_ "github.com/siderolabs/discovery-service/internal/proto"
40-
"github.com/siderolabs/discovery-service/internal/state"
4140
"github.com/siderolabs/discovery-service/internal/state/storage"
4241
"github.com/siderolabs/discovery-service/pkg/limits"
4342
"github.com/siderolabs/discovery-service/pkg/server"
43+
"github.com/siderolabs/discovery-service/pkg/state"
4444
)
4545

4646
var (

internal/landing/landing.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ import (
1616

1717
"go.uber.org/zap"
1818

19-
"github.com/siderolabs/discovery-service/internal/state"
19+
internalstate "github.com/siderolabs/discovery-service/internal/state"
20+
"github.com/siderolabs/discovery-service/pkg/state"
2021
)
2122

2223
//go:embed "html/index.html"
@@ -28,7 +29,7 @@ var inspectTemplate []byte
2829
// ClusterInspectData represents all affiliate data asssociated with a cluster.
2930
type ClusterInspectData struct {
3031
ClusterID string
31-
Affiliates []*state.AffiliateExport
32+
Affiliates []*internalstate.AffiliateExport
3233
}
3334

3435
var inspectPage = template.Must(template.New("inspect").Parse(string(inspectTemplate)))

internal/state/cluster.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,19 @@ type Cluster struct {
2727
subscriptionsMu sync.Mutex
2828
}
2929

30-
// NewCluster creates new cluster with specified ID.
30+
// NewCluster creates new cluster with specified id.
3131
func NewCluster(id string) *Cluster {
3232
return &Cluster{
3333
id: id,
3434
affiliates: map[string]*Affiliate{},
3535
}
3636
}
3737

38+
// ID returns the cluster id.
39+
func (cluster *Cluster) ID() string {
40+
return cluster.id
41+
}
42+
3843
// WithAffiliate runs a function against the affiliate.
3944
//
4045
// Cluster state is locked while the function is running.
@@ -174,7 +179,8 @@ func (cluster *Cluster) notify(notifications ...*Notification) {
174179
}
175180
}
176181

177-
func (cluster *Cluster) stats() (affiliates, endpoints, subscriptions int) {
182+
// Stats returns the number of affiliates, endpoints and subscriptions.
183+
func (cluster *Cluster) Stats() (affiliates, endpoints, subscriptions int) {
178184
cluster.affiliatesMu.Lock()
179185

180186
affiliates = len(cluster.affiliates)

internal/state/snapshot.go

Lines changed: 4 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package state
77

88
import (
9-
"fmt"
109
"slices"
1110

1211
"github.com/siderolabs/gen/xslices"
@@ -15,52 +14,8 @@ import (
1514
storagepb "github.com/siderolabs/discovery-service/api/storage"
1615
)
1716

18-
// ExportClusterSnapshots exports all cluster snapshots and calls the provided function for each one.
19-
//
20-
// Implements storage.Snapshotter interface.
21-
func (state *State) ExportClusterSnapshots(f func(snapshot *storagepb.ClusterSnapshot) error) error {
22-
var err error
23-
24-
// reuse the same snapshotin each iteration
25-
clusterSnapshot := &storagepb.ClusterSnapshot{}
26-
27-
state.clusters.Enumerate(func(_ string, cluster *Cluster) bool {
28-
snapshotCluster(cluster, clusterSnapshot)
29-
30-
err = f(clusterSnapshot)
31-
32-
return err == nil
33-
})
34-
35-
return err
36-
}
37-
38-
// ImportClusterSnapshots imports cluster snapshots by calling the provided function until it returns false.
39-
//
40-
// Implements storage.Snapshotter interface.
41-
func (state *State) ImportClusterSnapshots(f func() (*storagepb.ClusterSnapshot, bool, error)) error {
42-
for {
43-
clusterSnapshot, ok, err := f()
44-
if err != nil {
45-
return err
46-
}
47-
48-
if !ok {
49-
break
50-
}
51-
52-
cluster := clusterFromSnapshot(clusterSnapshot)
53-
54-
_, loaded := state.clusters.LoadOrStore(cluster.id, cluster)
55-
if loaded {
56-
return fmt.Errorf("cluster %q already exists", cluster.id)
57-
}
58-
}
59-
60-
return nil
61-
}
62-
63-
func snapshotCluster(cluster *Cluster, snapshot *storagepb.ClusterSnapshot) {
17+
// Snapshot takes a snapshot of the cluster into the given snapshot reference.
18+
func (cluster *Cluster) Snapshot(snapshot *storagepb.ClusterSnapshot) {
6419
cluster.affiliatesMu.Lock()
6520
defer cluster.affiliatesMu.Unlock()
6621

@@ -110,7 +65,8 @@ func snapshotCluster(cluster *Cluster, snapshot *storagepb.ClusterSnapshot) {
11065
}
11166
}
11267

113-
func clusterFromSnapshot(snapshot *storagepb.ClusterSnapshot) *Cluster {
68+
// ClusterFromSnapshot creates a new cluster from the provided snapshot.
69+
func ClusterFromSnapshot(snapshot *storagepb.ClusterSnapshot) *Cluster {
11470
return &Cluster{
11571
id: snapshot.Id,
11672
affiliates: xslices.ToMap(snapshot.Affiliates, affiliateFromSnapshot),

internal/state/state.go

Lines changed: 1 addition & 171 deletions
Original file line numberDiff line numberDiff line change
@@ -3,175 +3,5 @@
33
// Use of this software is governed by the Business Source License
44
// included in the LICENSE file.
55

6-
// Package state implements server state with clusters, affiliates, subscriptions, etc.
6+
// Package state contains internal state-related components such as affiliates, subscriptions, etc.
77
package state
8-
9-
import (
10-
"context"
11-
"time"
12-
13-
prom "github.com/prometheus/client_golang/prometheus"
14-
"github.com/siderolabs/gen/concurrent"
15-
"go.uber.org/zap"
16-
)
17-
18-
// State keeps the discovery service state.
19-
type State struct {
20-
clusters *concurrent.HashTrieMap[string, *Cluster]
21-
logger *zap.Logger
22-
23-
mClustersDesc *prom.Desc
24-
mAffiliatesDesc *prom.Desc
25-
mEndpointsDesc *prom.Desc
26-
mSubscriptionsDesc *prom.Desc
27-
mGCRuns prom.Counter
28-
mGCClusters prom.Counter
29-
mGCAffiliates prom.Counter
30-
}
31-
32-
// NewState create new instance of State.
33-
func NewState(logger *zap.Logger) *State {
34-
return &State{
35-
clusters: concurrent.NewHashTrieMap[string, *Cluster](),
36-
logger: logger,
37-
mClustersDesc: prom.NewDesc(
38-
"discovery_state_clusters",
39-
"The current number of clusters in the state.",
40-
nil, nil,
41-
),
42-
mAffiliatesDesc: prom.NewDesc(
43-
"discovery_state_affiliates",
44-
"The current number of affiliates in the state.",
45-
nil, nil,
46-
),
47-
mEndpointsDesc: prom.NewDesc(
48-
"discovery_state_endpoints",
49-
"The current number of endpoints in the state.",
50-
nil, nil,
51-
),
52-
mSubscriptionsDesc: prom.NewDesc(
53-
"discovery_state_subscriptions",
54-
"The current number of subscriptions in the state.",
55-
nil, nil,
56-
),
57-
mGCRuns: prom.NewCounter(prom.CounterOpts{
58-
Name: "discovery_state_gc_runs_total",
59-
Help: "The number of GC runs.",
60-
}),
61-
mGCClusters: prom.NewCounter(prom.CounterOpts{
62-
Name: "discovery_state_gc_clusters_total",
63-
Help: "The total number of GC'ed clusters.",
64-
}),
65-
mGCAffiliates: prom.NewCounter(prom.CounterOpts{
66-
Name: "discovery_state_gc_affiliates_total",
67-
Help: "The total number of GC'ed affiliates.",
68-
}),
69-
}
70-
}
71-
72-
// GetCluster returns cluster by ID, creating it if needed.
73-
func (state *State) GetCluster(id string) *Cluster {
74-
if cluster, ok := state.clusters.Load(id); ok {
75-
return cluster
76-
}
77-
78-
cluster, loaded := state.clusters.LoadOrStore(id, NewCluster(id))
79-
if !loaded {
80-
state.logger.Debug("cluster created", zap.String("cluster_id", id))
81-
}
82-
83-
return cluster
84-
}
85-
86-
// GarbageCollect recursively each cluster, and remove empty clusters.
87-
func (state *State) GarbageCollect(now time.Time) (removedClusters, removedAffiliates int) {
88-
state.clusters.Enumerate(func(key string, cluster *Cluster) bool {
89-
ra, empty := cluster.GarbageCollect(now)
90-
removedAffiliates += ra
91-
92-
if empty {
93-
state.clusters.CompareAndDelete(key, cluster)
94-
state.logger.Debug("cluster removed", zap.String("cluster_id", key))
95-
96-
removedClusters++
97-
}
98-
99-
return true
100-
})
101-
102-
state.mGCRuns.Inc()
103-
state.mGCClusters.Add(float64(removedClusters))
104-
state.mGCAffiliates.Add(float64(removedAffiliates))
105-
106-
return
107-
}
108-
109-
// RunGC runs the garbage collection on interval.
110-
func (state *State) RunGC(ctx context.Context, logger *zap.Logger, interval time.Duration) {
111-
ticker := time.NewTicker(interval)
112-
defer ticker.Stop()
113-
114-
for ctx.Err() == nil {
115-
removedClusters, removedAffiliates := state.GarbageCollect(time.Now())
116-
clusters, affiliates, endpoints, subscriptions := state.stats()
117-
118-
logFunc := logger.Debug
119-
if removedClusters > 0 || removedAffiliates > 0 {
120-
logFunc = logger.Info
121-
}
122-
123-
logFunc(
124-
"garbage collection run",
125-
zap.Int("removed_clusters", removedClusters),
126-
zap.Int("removed_affiliates", removedAffiliates),
127-
zap.Int("current_clusters", clusters),
128-
zap.Int("current_affiliates", affiliates),
129-
zap.Int("current_endpoints", endpoints),
130-
zap.Int("current_subscriptions", subscriptions),
131-
)
132-
133-
select {
134-
case <-ctx.Done():
135-
case <-ticker.C:
136-
}
137-
}
138-
}
139-
140-
func (state *State) stats() (clusters, affiliates, endpoints, subscriptions int) {
141-
state.clusters.Enumerate(func(_ string, cluster *Cluster) bool {
142-
clusters++
143-
144-
a, e, s := cluster.stats()
145-
affiliates += a
146-
endpoints += e
147-
subscriptions += s
148-
149-
return true
150-
})
151-
152-
return
153-
}
154-
155-
// Describe implements prom.Collector interface.
156-
func (state *State) Describe(ch chan<- *prom.Desc) {
157-
prom.DescribeByCollect(state, ch)
158-
}
159-
160-
// Collect implements prom.Collector interface.
161-
func (state *State) Collect(ch chan<- prom.Metric) {
162-
clusters, affiliates, endpoints, subscriptions := state.stats()
163-
164-
ch <- prom.MustNewConstMetric(state.mClustersDesc, prom.GaugeValue, float64(clusters))
165-
ch <- prom.MustNewConstMetric(state.mAffiliatesDesc, prom.GaugeValue, float64(affiliates))
166-
ch <- prom.MustNewConstMetric(state.mEndpointsDesc, prom.GaugeValue, float64(endpoints))
167-
ch <- prom.MustNewConstMetric(state.mSubscriptionsDesc, prom.GaugeValue, float64(subscriptions))
168-
169-
ch <- state.mGCRuns
170-
ch <- state.mGCClusters
171-
ch <- state.mGCAffiliates
172-
}
173-
174-
// Check interfaces.
175-
var (
176-
_ prom.Collector = (*State)(nil)
177-
)

internal/state/storage/storage_bench_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import (
1313
"go.uber.org/zap"
1414

1515
storagepb "github.com/siderolabs/discovery-service/api/storage"
16-
"github.com/siderolabs/discovery-service/internal/state"
1716
"github.com/siderolabs/discovery-service/internal/state/storage"
17+
"github.com/siderolabs/discovery-service/pkg/state"
1818
)
1919

2020
func BenchmarkExport(b *testing.B) {

pkg/server/landing_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ import (
1818
"go.uber.org/zap/zaptest"
1919

2020
"github.com/siderolabs/discovery-service/internal/landing"
21-
"github.com/siderolabs/discovery-service/internal/state"
21+
internalstate "github.com/siderolabs/discovery-service/internal/state"
22+
"github.com/siderolabs/discovery-service/pkg/state"
2223
)
2324

2425
// TestInspectHandler tests the /inspect endpoint.
@@ -33,14 +34,14 @@ func TestInspectHanlder(t *testing.T) {
3334
t.Cleanup(cancel)
3435

3536
// add affiliates to the cluster "fake1"
36-
err := testCluster.WithAffiliate("af1", func(affiliate *state.Affiliate) error {
37+
err := testCluster.WithAffiliate("af1", func(affiliate *internalstate.Affiliate) error {
3738
affiliate.Update([]byte("data1"), now.Add(time.Minute))
3839

3940
return nil
4041
})
4142
require.NoError(t, err)
4243

43-
err = testCluster.WithAffiliate("af2", func(affiliate *state.Affiliate) error {
44+
err = testCluster.WithAffiliate("af2", func(affiliate *internalstate.Affiliate) error {
4445
affiliate.Update([]byte("data2"), now.Add(time.Minute))
4546

4647
return nil

pkg/server/server.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ import (
1717
"google.golang.org/grpc/codes"
1818
"google.golang.org/grpc/status"
1919

20-
"github.com/siderolabs/discovery-service/internal/state"
20+
internalstate "github.com/siderolabs/discovery-service/internal/state"
21+
"github.com/siderolabs/discovery-service/pkg/state"
2122
)
2223

2324
const updateBuffer = 32
@@ -104,7 +105,7 @@ func (srv *ClusterServer) AffiliateUpdate(_ context.Context, req *pb.AffiliateUp
104105
return nil, err
105106
}
106107

107-
if err := srv.state.GetCluster(req.ClusterId).WithAffiliate(req.AffiliateId, func(affiliate *state.Affiliate) error {
108+
if err := srv.state.GetCluster(req.ClusterId).WithAffiliate(req.AffiliateId, func(affiliate *internalstate.Affiliate) error {
108109
expiration := time.Now().Add(req.Ttl.AsDuration())
109110

110111
if len(req.AffiliateData) > 0 {
@@ -114,9 +115,9 @@ func (srv *ClusterServer) AffiliateUpdate(_ context.Context, req *pb.AffiliateUp
114115
return affiliate.MergeEndpoints(req.AffiliateEndpoints, expiration)
115116
}); err != nil {
116117
switch {
117-
case errors.Is(err, state.ErrTooManyEndpoints):
118+
case errors.Is(err, internalstate.ErrTooManyEndpoints):
118119
return nil, status.Error(codes.ResourceExhausted, err.Error())
119-
case errors.Is(err, state.ErrTooManyAffiliates):
120+
case errors.Is(err, internalstate.ErrTooManyAffiliates):
120121
return nil, status.Error(codes.ResourceExhausted, err.Error())
121122
default:
122123
return nil, err
@@ -170,7 +171,7 @@ func (srv *ClusterServer) Watch(req *pb.WatchRequest, server pb.Cluster_WatchSer
170171
}
171172

172173
// make enough room to handle connection issues
173-
updates := make(chan *state.Notification, updateBuffer)
174+
updates := make(chan *internalstate.Notification, updateBuffer)
174175

175176
snapshot, subscription := srv.state.GetCluster(req.ClusterId).Subscribe(updates)
176177
defer subscription.Close()

pkg/server/server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ import (
3232

3333
"github.com/siderolabs/discovery-service/internal/limiter"
3434
_ "github.com/siderolabs/discovery-service/internal/proto"
35-
"github.com/siderolabs/discovery-service/internal/state"
3635
"github.com/siderolabs/discovery-service/pkg/limits"
3736
"github.com/siderolabs/discovery-service/pkg/server"
37+
"github.com/siderolabs/discovery-service/pkg/state"
3838
)
3939

4040
func checkMetrics(t *testing.T, c prom.Collector) {

0 commit comments

Comments
 (0)