41
41
resource : "pods" ,
42
42
namespace : "default" ,
43
43
// Please keep the sum of weights equal 100.
44
+ readChoices : []random.ChoiceWeight [KubernetesRequestType ]{
45
+ {Choice : KubernetesListStale , Weight : 20 },
46
+ {Choice : KubernetesListAndWatch , Weight : 80 },
47
+ },
48
+ // Please keep the sum of weights equal 100.
44
49
writeChoices : []random.ChoiceWeight [KubernetesRequestType ]{
45
50
{Choice : KubernetesUpdate , Weight : 90 },
46
51
{Choice : KubernetesDelete , Weight : 5 },
53
58
resource : "pods" ,
54
59
namespace : "default" ,
55
60
// Please keep the sum of weights equal 100.
61
+ readChoices : []random.ChoiceWeight [KubernetesRequestType ]{
62
+ {Choice : KubernetesListStale , Weight : 20 },
63
+ {Choice : KubernetesListAndWatch , Weight : 80 },
64
+ },
65
+ // Please keep the sum of weights equal 100.
56
66
writeChoices : []random.ChoiceWeight [KubernetesRequestType ]{
57
67
{Choice : KubernetesDelete , Weight : 40 },
58
68
{Choice : KubernetesCreate , Weight : 60 },
@@ -64,6 +74,7 @@ type kubernetesTraffic struct {
64
74
averageKeyCount int
65
75
resource string
66
76
namespace string
77
+ readChoices []random.ChoiceWeight [KubernetesRequestType ]
67
78
writeChoices []random.ChoiceWeight [KubernetesRequestType ]
68
79
}
69
80
@@ -76,7 +87,6 @@ func (t kubernetesTraffic) RunTrafficLoop(ctx context.Context, c *client.Recordi
76
87
s := newStorage ()
77
88
keyPrefix := "/registry/" + t .resource + "/"
78
89
g := errgroup.Group {}
79
- readLimit := t .averageKeyCount
80
90
81
91
g .Go (func () error {
82
92
for {
@@ -87,11 +97,10 @@ func (t kubernetesTraffic) RunTrafficLoop(ctx context.Context, c *client.Recordi
87
97
return nil
88
98
default :
89
99
}
90
- rev , err := t .Read (ctx , kc , s , limiter , keyPrefix , readLimit )
100
+ err := t .Read (ctx , c , s , limiter , keyPrefix )
91
101
if err != nil {
92
102
continue
93
103
}
94
- t .Watch (ctx , c , s , limiter , keyPrefix , rev + 1 )
95
104
}
96
105
})
97
106
g .Go (func () error {
@@ -106,7 +115,7 @@ func (t kubernetesTraffic) RunTrafficLoop(ctx context.Context, c *client.Recordi
106
115
}
107
116
// Avoid multiple failed writes in a row
108
117
if lastWriteFailed {
109
- _ , err := t .Read (ctx , kc , s , limiter , keyPrefix , 0 )
118
+ _ , err := t .List (ctx , kc , s , limiter , keyPrefix , t . averageKeyCount , 0 )
110
119
if err != nil {
111
120
continue
112
121
}
@@ -121,10 +130,29 @@ func (t kubernetesTraffic) RunTrafficLoop(ctx context.Context, c *client.Recordi
121
130
g .Wait ()
122
131
}
123
132
124
- func (t kubernetesTraffic ) Read (ctx context.Context , kc kubernetes.Interface , s * storage , limiter * rate.Limiter , keyPrefix string , limit int ) (rev int64 , err error ) {
133
+ func (t kubernetesTraffic ) Read (ctx context.Context , c * client.RecordingClient , s * storage , limiter * rate.Limiter , keyPrefix string ) error {
134
+ kc := kubernetes.Client {Client : & clientv3.Client {KV : c }}
135
+ op := random .PickRandom (t .readChoices )
136
+ switch op {
137
+ case KubernetesListStale :
138
+ _ , rev := s .PickRandom ()
139
+ _ , err := t .List (ctx , kc , s , limiter , keyPrefix , t .averageKeyCount , rev )
140
+ return err
141
+ case KubernetesListAndWatch :
142
+ rev , err := t .List (ctx , kc , s , limiter , keyPrefix , t .averageKeyCount , 0 )
143
+ if err != nil {
144
+ return err
145
+ }
146
+ t .Watch (ctx , c , s , limiter , keyPrefix , rev + 1 )
147
+ return nil
148
+ default :
149
+ panic (fmt .Sprintf ("invalid choice: %q" , op ))
150
+ }
151
+ }
152
+
153
+ func (t kubernetesTraffic ) List (ctx context.Context , kc kubernetes.Interface , s * storage , limiter * rate.Limiter , keyPrefix string , limit int , revision int64 ) (rev int64 , err error ) {
125
154
hasMore := true
126
155
var kvs []* mvccpb.KeyValue
127
- var revision int64
128
156
var cont string
129
157
130
158
for hasMore {
@@ -277,9 +305,11 @@ func compact(ctx context.Context, client *client.RecordingClient, t, rev int64)
277
305
type KubernetesRequestType string
278
306
279
307
const (
280
- KubernetesDelete KubernetesRequestType = "delete"
281
- KubernetesUpdate KubernetesRequestType = "update"
282
- KubernetesCreate KubernetesRequestType = "create"
308
+ KubernetesDelete KubernetesRequestType = "delete"
309
+ KubernetesUpdate KubernetesRequestType = "update"
310
+ KubernetesCreate KubernetesRequestType = "create"
311
+ KubernetesListStale KubernetesRequestType = "list_stale"
312
+ KubernetesListAndWatch KubernetesRequestType = "list_watch"
283
313
)
284
314
285
315
type storage struct {
@@ -333,7 +363,11 @@ func (s *storage) Count() int {
333
363
func (s * storage ) PickRandom () (key string , rev int64 ) {
334
364
s .mux .RLock ()
335
365
defer s .mux .RUnlock ()
336
- n := rand .Intn (len (s .keyRevision ))
366
+ l := len (s .keyRevision )
367
+ if l == 0 {
368
+ return "" , 0
369
+ }
370
+ n := rand .Intn (l )
337
371
i := 0
338
372
for k , v := range s .keyRevision {
339
373
if i == n {
0 commit comments