Skip to content

Commit 1fc1ff4

Browse files
committed
CLIENT-3365 Dynamic config changes
1 parent f6182b5 commit 1fc1ff4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+4424
-153
lines changed

batch_command_delete.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func (cmd *batchCommandDelete) commandType() commandType {
175175
}
176176

177177
func (cmd *batchCommandDelete) executeSingle(client *Client) Error {
178-
policy := cmd.batchDeletePolicy.toWritePolicy(cmd.policy)
178+
policy := cmd.batchDeletePolicy.toWritePolicyWithConfig(cmd.policy, client.dynConfig)
179179
for i, key := range cmd.keys {
180180
res, err := client.Operate(policy, key, DeleteOp())
181181
cmd.records[i].setRecord(res)

batch_command_operate.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -246,16 +246,16 @@ func (cmd *batchCommandOperate) executeSingle(client *Client) Error {
246246
} else if len(ops) == 0 {
247247
ops = append(ops, GetOp())
248248
}
249-
res, err = client.Operate(cmd.client.getUsableBatchReadPolicy(br.Policy).toWritePolicy(cmd.policy), br.Key, ops...)
249+
res, err = client.Operate(cmd.client.getUsableBatchReadPolicyWithConfig(br.Policy).ToWritePolicyWithConfig(cmd.policy, client.dynConfig), br.Key, ops...)
250250
case *BatchWrite:
251-
policy := cmd.client.getUsableBatchWritePolicy(br.Policy).toWritePolicy(cmd.policy)
251+
policy := cmd.client.getUsableBatchWritePolicyWithConfig(br.Policy).toWritePolicyWithConfig(cmd.policy, client.dynConfig)
252252
policy.RespondPerEachOp = true
253253
res, err = client.Operate(policy, br.Key, br.Ops...)
254254
case *BatchDelete:
255-
policy := cmd.client.getUsableBatchDeletePolicy(br.Policy).toWritePolicy(cmd.policy)
255+
policy := cmd.client.getUsableBatchDeletePolicyWithConfig(br.Policy).toWritePolicyWithConfig(cmd.policy, client.dynConfig)
256256
res, err = client.Operate(policy, br.Key, DeleteOp())
257257
case *BatchUDF:
258-
policy := cmd.client.getUsableBatchUDFPolicy(br.Policy).toWritePolicy(cmd.policy)
258+
policy := cmd.client.getUsableBatchUDFPolicyWithConfig(br.Policy).toWritePolicyWithConfig(cmd.policy, client.dynConfig)
259259
policy.RespondPerEachOp = true
260260
res, err = client.execute(policy, br.Key, br.PackageName, br.FunctionName, br.FunctionArgs...)
261261
}

batch_command_udf.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func (cmd *batchCommandUDF) isRead() bool {
185185

186186
func (cmd *batchCommandUDF) executeSingle(client *Client) Error {
187187
for i, key := range cmd.keys {
188-
policy := cmd.batchUDFPolicy.toWritePolicy(cmd.policy)
188+
policy := cmd.batchUDFPolicy.toWritePolicyWithConfig(cmd.policy, client.dynConfig)
189189
policy.RespondPerEachOp = true
190190
res, err := client.execute(policy, key, cmd.packageName, cmd.functionName, cmd.args...)
191191
cmd.records[i].setRecord(res)

batch_delete_policy.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
package aerospike
1616

17+
import pc "github.com/aerospike/aerospike-client-go/v8/internal/cache"
18+
1719
// BatchDeletePolicy is used in batch delete commands.
1820
type BatchDeletePolicy struct {
1921
// FilterExpression is optional expression filter. If FilterExpression exists and evaluates to false, the specific batch key
@@ -58,6 +60,17 @@ func NewBatchDeletePolicy() *BatchDeletePolicy {
5860
}
5961
}
6062

63+
func NewBatchDeletePolicyOrDefaultFromCache(dynConfig *DynConfig) *BatchDeletePolicy {
64+
if dynConfig == nil {
65+
return NewBatchDeletePolicy()
66+
}
67+
68+
dynConfig.lock.RLock()
69+
defer dynConfig.lock.RUnlock()
70+
71+
return dynConfig.mappedPolicies.Get(pc.BATCH_DELETE_POLICY).(*BatchDeletePolicy)
72+
}
73+
6174
func (bdp *BatchDeletePolicy) toWritePolicy(bp *BatchPolicy) *WritePolicy {
6275
wp := bp.toWritePolicy()
6376

@@ -73,3 +86,115 @@ func (bdp *BatchDeletePolicy) toWritePolicy(bp *BatchPolicy) *WritePolicy {
7386
}
7487
return wp
7588
}
89+
90+
func (bdp *BatchDeletePolicy) toWritePolicyWithConfig(bp *BatchPolicy, dynConfig *DynConfig) *WritePolicy {
91+
wp := bp.toWritePolicy()
92+
93+
if bdp != nil {
94+
if bdp.FilterExpression != nil {
95+
wp.FilterExpression = bdp.FilterExpression
96+
}
97+
wp.CommitLevel = bdp.CommitLevel
98+
wp.GenerationPolicy = bdp.GenerationPolicy
99+
wp.Generation = bdp.Generation
100+
wp.DurableDelete = bdp.DurableDelete
101+
wp.SendKey = bdp.SendKey
102+
}
103+
104+
// In Case dynConfig is not initialized or running return the policy before
105+
// merge
106+
if dynConfig == nil {
107+
return wp
108+
}
109+
110+
dynConfig.lock.RLock()
111+
defer dynConfig.lock.RUnlock()
112+
113+
config := dynConfig.config
114+
if config != nil && config.Dynamic.BatchDelete != nil {
115+
if config.Dynamic.BatchDelete.DurableDelete != nil {
116+
wp.DurableDelete = *config.Dynamic.BatchDelete.DurableDelete
117+
}
118+
if config.Dynamic.BatchDelete.SendKey != nil {
119+
wp.SendKey = *config.Dynamic.BatchDelete.SendKey
120+
}
121+
}
122+
123+
return wp
124+
}
125+
126+
// copyBatchDeletePolicy creates a new BasePolicy instance and copies the values from the source BatchDeletePolicy.
127+
func copyBatchDeletePolicy(src *BatchDeletePolicy) *BatchDeletePolicy {
128+
if src == nil {
129+
return nil
130+
}
131+
132+
response := NewBatchDeletePolicy()
133+
134+
response.FilterExpression = src.FilterExpression
135+
response.FilterExpression = src.FilterExpression
136+
response.CommitLevel = src.CommitLevel
137+
response.GenerationPolicy = src.GenerationPolicy
138+
response.Generation = src.Generation
139+
response.DurableDelete = src.DurableDelete
140+
response.SendKey = src.SendKey
141+
142+
return response
143+
}
144+
145+
// applyConfigToBatchDeletePolicy applies the dynamic configuration and generates a new policy
146+
func applyConfigToBatchDeletePolicy(policy *BatchDeletePolicy, dynConfig *DynConfig) *BatchDeletePolicy {
147+
if dynConfig == nil {
148+
return policy
149+
}
150+
151+
config := dynConfig.config
152+
153+
if config == nil && !dynConfig.configInitialized.Load() {
154+
// On initial load it is possible that the config is not yet loaded. This will kick things off to make sure
155+
// config is loaded.
156+
dynConfig.loadConfig()
157+
config = dynConfig.config
158+
}
159+
160+
dynConfig.lock.RLock()
161+
defer dynConfig.lock.RUnlock()
162+
163+
if config != nil && config.Dynamic != nil && config.Dynamic.BatchDelete != nil {
164+
// Dynamic configuration is exists for policy in question.
165+
var responsePolicy *BatchDeletePolicy
166+
// User has provided a custom policy. We need to apply the dynamic configuration.
167+
if policy != nil {
168+
// Copy the existing write policy to preserve any custom settings.
169+
responsePolicy = copyBatchDeletePolicy(policy)
170+
responsePolicy = mapDynamicBatchDeletePolicy(responsePolicy, dynConfig)
171+
172+
return responsePolicy
173+
} else {
174+
// Passed in policy is nil, fetch mapped default policy from cache.
175+
responsePolicy = dynConfig.mappedPolicies.Get(pc.BATCH_DELETE_POLICY).(*BatchDeletePolicy)
176+
177+
// If we have found entry in cache no need to map again return it.
178+
return responsePolicy
179+
}
180+
} else {
181+
return policy
182+
}
183+
}
184+
185+
func mapDynamicBatchDeletePolicy(policy *BatchDeletePolicy, dynConfig *DynConfig) *BatchDeletePolicy {
186+
if dynConfig.config == nil && dynConfig.config.Dynamic == nil {
187+
return policy
188+
}
189+
190+
if dynConfig.config.Dynamic.BatchDelete != nil {
191+
if dynConfig.config.Dynamic.BatchDelete.DurableDelete != nil {
192+
policy.DurableDelete = *dynConfig.config.Dynamic.BatchDelete.DurableDelete
193+
}
194+
if dynConfig.config.Dynamic.BatchDelete.SendKey != nil {
195+
policy.SendKey = *dynConfig.config.Dynamic.BatchDelete.SendKey
196+
}
197+
}
198+
199+
return policy
200+
}

batch_delete_policy_config_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package aerospike
2+
3+
import (
4+
dynconfig "github.com/aerospike/aerospike-client-go/v8/config"
5+
. "github.com/onsi/ginkgo/v2"
6+
. "github.com/onsi/gomega"
7+
)
8+
9+
var _ = Describe("ApplyConfigToBatchDeletePolicy", func() {
10+
11+
Context("when applying full configuration to batch delete policy", func() {
12+
It("should update the policy values based on the dynamic config", func() {
13+
// Create the full configuration.
14+
config := &DynConfig{
15+
config: &dynconfig.Config{
16+
Dynamic: &dynconfig.DynamicConfig{
17+
BatchDelete: &dynconfig.BatchDelete{
18+
DurableDelete: func() *bool {
19+
r := true
20+
return &r
21+
}(),
22+
SendKey: func() *bool {
23+
r := true
24+
return &r
25+
}(),
26+
},
27+
},
28+
},
29+
}
30+
31+
// Create an initial BatchReadPolicy.
32+
policy := NewBatchDeletePolicy()
33+
34+
// Verify defaults.
35+
Expect(policy).NotTo(BeNil())
36+
Expect(policy.DurableDelete).To(BeFalse())
37+
Expect(policy.SendKey).To(BeFalse())
38+
39+
// Apply configuration.
40+
updatedPolicy := applyConfigToBatchDeletePolicy(policy, config)
41+
42+
// Validate applied configuration.
43+
Expect(updatedPolicy).NotTo(BeNil())
44+
Expect(updatedPolicy.DurableDelete).To(BeTrue())
45+
Expect(updatedPolicy.DurableDelete).To(BeTrue())
46+
})
47+
})
48+
49+
Context("when applying batch read config to a write policy", func() {
50+
It("should update the write policy values based on the batch delete dynamic config", func() {
51+
// Create the full configuration.
52+
config := &DynConfig{
53+
config: &dynconfig.Config{
54+
Dynamic: &dynconfig.DynamicConfig{
55+
BatchDelete: &dynconfig.BatchDelete{
56+
DurableDelete: func() *bool {
57+
r := true
58+
return &r
59+
}(),
60+
SendKey: func() *bool {
61+
r := true
62+
return &r
63+
}(),
64+
},
65+
},
66+
},
67+
}
68+
69+
// Create an initial BatchPolicy (used for write operations).
70+
batchPolicy := NewBatchPolicy()
71+
72+
// Verify defaults.
73+
Expect(batchPolicy).NotTo(BeNil())
74+
Expect(batchPolicy.ReadModeAP).To(Equal(ReadModeAPOne))
75+
Expect(batchPolicy.ReadModeSC).To(Equal(ReadModeSCSession))
76+
Expect(batchPolicy.ReadTouchTTLPercent).To(Equal(int32(0)))
77+
78+
batchDeletePolicy := NewBatchDeletePolicy()
79+
updatedWritePolicy := batchDeletePolicy.toWritePolicyWithConfig(batchPolicy, config)
80+
81+
// Validate applied configuration.
82+
Expect(updatedWritePolicy).NotTo(BeNil())
83+
Expect(updatedWritePolicy.DurableDelete).To(BeTrue())
84+
Expect(updatedWritePolicy.SendKey).To(BeTrue())
85+
})
86+
})
87+
})

0 commit comments

Comments
 (0)