Skip to content

Commit f282446

Browse files
committed
CLIENT-3365 Dynamic config changes
1 parent 3445986 commit f282446

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+28978
-171
lines changed

batch_command_delete.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func (cmd *batchCommandDelete) commandType() commandType {
175175
}
176176

177177
func (cmd *batchCommandDelete) executeSingle(client *Client) Error {
178-
policy := cmd.batchDeletePolicy.toWritePolicy(cmd.policy)
178+
policy := cmd.batchDeletePolicy.toWritePolicyWithConfig(cmd.policy, client.dynConfig)
179179
for i, key := range cmd.keys {
180180
res, err := client.Operate(policy, key, DeleteOp())
181181
cmd.records[i].setRecord(res)

batch_command_operate.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -246,16 +246,16 @@ func (cmd *batchCommandOperate) executeSingle(client *Client) Error {
246246
} else if len(ops) == 0 {
247247
ops = append(ops, GetOp())
248248
}
249-
res, err = client.Operate(cmd.client.getUsableBatchReadPolicy(br.Policy).toWritePolicy(cmd.policy), br.Key, ops...)
249+
res, err = client.Operate(cmd.client.getUsableBatchReadPolicyWithConfig(br.Policy).ToWritePolicyWithConfig(cmd.policy, client.dynConfig), br.Key, ops...)
250250
case *BatchWrite:
251-
policy := cmd.client.getUsableBatchWritePolicy(br.Policy).toWritePolicy(cmd.policy)
251+
policy := cmd.client.getUsableBatchWritePolicyWithConfig(br.Policy).toWritePolicyWithConfig(cmd.policy, client.dynConfig)
252252
policy.RespondPerEachOp = true
253253
res, err = client.Operate(policy, br.Key, br.Ops...)
254254
case *BatchDelete:
255-
policy := cmd.client.getUsableBatchDeletePolicy(br.Policy).toWritePolicy(cmd.policy)
255+
policy := cmd.client.getUsableBatchDeletePolicyWithConfig(br.Policy).toWritePolicyWithConfig(cmd.policy, client.dynConfig)
256256
res, err = client.Operate(policy, br.Key, DeleteOp())
257257
case *BatchUDF:
258-
policy := cmd.client.getUsableBatchUDFPolicy(br.Policy).toWritePolicy(cmd.policy)
258+
policy := cmd.client.getUsableBatchUDFPolicyWithConfig(br.Policy).toWritePolicyWithConfig(cmd.policy, client.dynConfig)
259259
policy.RespondPerEachOp = true
260260
res, err = client.execute(policy, br.Key, br.PackageName, br.FunctionName, br.FunctionArgs...)
261261
}

batch_command_udf.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func (cmd *batchCommandUDF) isRead() bool {
185185

186186
func (cmd *batchCommandUDF) executeSingle(client *Client) Error {
187187
for i, key := range cmd.keys {
188-
policy := cmd.batchUDFPolicy.toWritePolicy(cmd.policy)
188+
policy := cmd.batchUDFPolicy.toWritePolicyWithConfig(cmd.policy, client.dynConfig)
189189
policy.RespondPerEachOp = true
190190
res, err := client.execute(policy, key, cmd.packageName, cmd.functionName, cmd.args...)
191191
cmd.records[i].setRecord(res)

batch_delete_policy.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
package aerospike
1616

17+
import pc "github.com/aerospike/aerospike-client-go/v8/internal/cache"
18+
1719
// BatchDeletePolicy is used in batch delete commands.
1820
type BatchDeletePolicy struct {
1921
// FilterExpression is optional expression filter. If FilterExpression exists and evaluates to false, the specific batch key
@@ -58,6 +60,17 @@ func NewBatchDeletePolicy() *BatchDeletePolicy {
5860
}
5961
}
6062

63+
func NewBatchDeletePolicyOrDefaultFromCache(dynConfig *DynConfig) *BatchDeletePolicy {
64+
if dynConfig == nil {
65+
return NewBatchDeletePolicy()
66+
}
67+
68+
dynConfig.lock.RLock()
69+
defer dynConfig.lock.RUnlock()
70+
71+
return dynConfig.mappedPolicies.Get(pc.BATCH_DELETE_POLICY).(*BatchDeletePolicy)
72+
}
73+
6174
func (bdp *BatchDeletePolicy) toWritePolicy(bp *BatchPolicy) *WritePolicy {
6275
wp := bp.toWritePolicy()
6376

@@ -73,3 +86,103 @@ func (bdp *BatchDeletePolicy) toWritePolicy(bp *BatchPolicy) *WritePolicy {
7386
}
7487
return wp
7588
}
89+
90+
func (bdp *BatchDeletePolicy) toWritePolicyWithConfig(bp *BatchPolicy, dynConfig *DynConfig) *WritePolicy {
91+
wp := bp.toWritePolicy()
92+
93+
if bdp != nil {
94+
if bdp.FilterExpression != nil {
95+
wp.FilterExpression = bdp.FilterExpression
96+
}
97+
wp.CommitLevel = bdp.CommitLevel
98+
wp.GenerationPolicy = bdp.GenerationPolicy
99+
wp.Generation = bdp.Generation
100+
wp.DurableDelete = bdp.DurableDelete
101+
wp.SendKey = bdp.SendKey
102+
}
103+
104+
// In Case dynConfig is not initialized or running return the policy before
105+
// merge
106+
if dynConfig == nil {
107+
return wp
108+
}
109+
110+
dynConfig.lock.RLock()
111+
defer dynConfig.lock.RUnlock()
112+
113+
config := dynConfig.config
114+
if config != nil && config.Dynamic.BatchDelete != nil {
115+
if config.Dynamic.BatchDelete.DurableDelete != nil {
116+
wp.DurableDelete = *config.Dynamic.BatchDelete.DurableDelete
117+
}
118+
if config.Dynamic.BatchDelete.SendKey != nil {
119+
wp.SendKey = *config.Dynamic.BatchDelete.SendKey
120+
}
121+
}
122+
123+
return wp
124+
}
125+
126+
// copyBatchDeletePolicy creates a new BasePolicy instance and copies the values from the source BatchDeletePolicy.
127+
func copyBatchDeletePolicy(src *BatchDeletePolicy) *BatchDeletePolicy {
128+
if src == nil {
129+
return nil
130+
}
131+
132+
response := NewBatchDeletePolicy()
133+
134+
response.FilterExpression = src.FilterExpression
135+
response.FilterExpression = src.FilterExpression
136+
response.CommitLevel = src.CommitLevel
137+
response.GenerationPolicy = src.GenerationPolicy
138+
response.Generation = src.Generation
139+
response.DurableDelete = src.DurableDelete
140+
response.SendKey = src.SendKey
141+
142+
return response
143+
}
144+
145+
// applyConfigToBatchDeletePolicy applies the dynamic configuration and generates a new policy
146+
func applyConfigToBatchDeletePolicy(policy *BatchDeletePolicy, dynConfig *DynConfig) *BatchDeletePolicy {
147+
if dynConfig == nil {
148+
return policy
149+
}
150+
151+
config := dynConfig.getConfigIfNotInitialized()
152+
153+
dynConfig.lock.RLock()
154+
defer dynConfig.lock.RUnlock()
155+
156+
if policy == nil {
157+
// Passed in policy is nil, fetch mapped default policy from cache.
158+
return dynConfig.mappedPolicies.Get(pc.BATCH_DELETE_POLICY).(*BatchDeletePolicy)
159+
}
160+
if config != nil && config.Dynamic != nil && config.Dynamic.BatchDelete != nil {
161+
// Dynamic configuration is exists for policy in question.
162+
var responsePolicy *BatchDeletePolicy
163+
// User has provided a custom policy. We need to apply the dynamic configuration.
164+
responsePolicy = copyBatchDeletePolicy(policy)
165+
responsePolicy = mapDynamicBatchDeletePolicy(responsePolicy, dynConfig)
166+
167+
return responsePolicy
168+
} else {
169+
return policy
170+
}
171+
}
172+
173+
func mapDynamicBatchDeletePolicy(policy *BatchDeletePolicy, dynConfig *DynConfig) *BatchDeletePolicy {
174+
if dynConfig.config == nil && dynConfig.config.Dynamic == nil {
175+
return policy
176+
}
177+
178+
if dynConfig.config.Dynamic.BatchDelete != nil {
179+
if dynConfig.config.Dynamic.BatchDelete.DurableDelete != nil {
180+
policy.DurableDelete = *dynConfig.config.Dynamic.BatchDelete.DurableDelete
181+
}
182+
if dynConfig.config.Dynamic.BatchDelete.SendKey != nil {
183+
policy.SendKey = *dynConfig.config.Dynamic.BatchDelete.SendKey
184+
}
185+
}
186+
187+
return policy
188+
}

batch_delete_policy_config_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Copyright 2014-2022 Aerospike, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package aerospike
16+
17+
import (
18+
dynconfig "github.com/aerospike/aerospike-client-go/v8/config"
19+
. "github.com/onsi/ginkgo/v2"
20+
. "github.com/onsi/gomega"
21+
)
22+
23+
var _ = Describe("ApplyConfigToBatchDeletePolicy", func() {
24+
25+
Context("when applying full configuration to batch delete policy", func() {
26+
It("should update the policy values based on the dynamic config", func() {
27+
// Create the full configuration.
28+
config := &DynConfig{
29+
config: &dynconfig.Config{
30+
Dynamic: &dynconfig.DynamicConfig{
31+
BatchDelete: &dynconfig.BatchDelete{
32+
DurableDelete: func() *bool {
33+
r := true
34+
return &r
35+
}(),
36+
SendKey: func() *bool {
37+
r := true
38+
return &r
39+
}(),
40+
},
41+
},
42+
},
43+
}
44+
45+
// Create an initial BatchReadPolicy.
46+
policy := NewBatchDeletePolicy()
47+
48+
// Verify defaults.
49+
Expect(policy).NotTo(BeNil())
50+
Expect(policy.DurableDelete).To(BeFalse())
51+
Expect(policy.SendKey).To(BeFalse())
52+
53+
// Apply configuration.
54+
updatedPolicy := applyConfigToBatchDeletePolicy(policy, config)
55+
56+
// Validate applied configuration.
57+
Expect(updatedPolicy).NotTo(BeNil())
58+
Expect(updatedPolicy.DurableDelete).To(BeTrue())
59+
Expect(updatedPolicy.DurableDelete).To(BeTrue())
60+
})
61+
})
62+
63+
Context("when applying batch read config to a write policy", func() {
64+
It("should update the write policy values based on the batch delete dynamic config", func() {
65+
// Create the full configuration.
66+
config := &DynConfig{
67+
config: &dynconfig.Config{
68+
Dynamic: &dynconfig.DynamicConfig{
69+
BatchDelete: &dynconfig.BatchDelete{
70+
DurableDelete: func() *bool {
71+
r := true
72+
return &r
73+
}(),
74+
SendKey: func() *bool {
75+
r := true
76+
return &r
77+
}(),
78+
},
79+
},
80+
},
81+
}
82+
83+
// Create an initial BatchPolicy (used for write operations).
84+
batchPolicy := NewBatchPolicy()
85+
86+
// Verify defaults.
87+
Expect(batchPolicy).NotTo(BeNil())
88+
Expect(batchPolicy.ReadModeAP).To(Equal(ReadModeAPOne))
89+
Expect(batchPolicy.ReadModeSC).To(Equal(ReadModeSCSession))
90+
Expect(batchPolicy.ReadTouchTTLPercent).To(Equal(int32(0)))
91+
92+
batchDeletePolicy := NewBatchDeletePolicy()
93+
updatedWritePolicy := batchDeletePolicy.toWritePolicyWithConfig(batchPolicy, config)
94+
95+
// Validate applied configuration.
96+
Expect(updatedWritePolicy).NotTo(BeNil())
97+
Expect(updatedWritePolicy.DurableDelete).To(BeTrue())
98+
Expect(updatedWritePolicy.SendKey).To(BeTrue())
99+
})
100+
})
101+
})

0 commit comments

Comments
 (0)