Skip to content

Commit b4191a7

Browse files
committed
CLIENT-3365 Dynamic config changes
1 parent d0cf7fd commit b4191a7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+28712
-172
lines changed

batch_command_delete.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func (cmd *batchCommandDelete) commandType() commandType {
175175
}
176176

177177
func (cmd *batchCommandDelete) executeSingle(client *Client) Error {
178-
policy := cmd.batchDeletePolicy.toWritePolicy(cmd.policy)
178+
policy := cmd.batchDeletePolicy.toWritePolicyWithConfig(cmd.policy, client.dynConfig)
179179
for i, key := range cmd.keys {
180180
res, err := client.Operate(policy, key, DeleteOp())
181181
cmd.records[i].setRecord(res)

batch_command_operate.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -246,16 +246,16 @@ func (cmd *batchCommandOperate) executeSingle(client *Client) Error {
246246
} else if len(ops) == 0 {
247247
ops = append(ops, GetOp())
248248
}
249-
res, err = client.Operate(cmd.client.getUsableBatchReadPolicy(br.Policy).toWritePolicy(cmd.policy), br.Key, ops...)
249+
res, err = client.Operate(cmd.client.getUsableBatchReadPolicyWithConfig(br.Policy).ToWritePolicyWithConfig(cmd.policy, client.dynConfig), br.Key, ops...)
250250
case *BatchWrite:
251-
policy := cmd.client.getUsableBatchWritePolicy(br.Policy).toWritePolicy(cmd.policy)
251+
policy := cmd.client.getUsableBatchWritePolicyWithConfig(br.Policy).toWritePolicyWithConfig(cmd.policy, client.dynConfig)
252252
policy.RespondPerEachOp = true
253253
res, err = client.Operate(policy, br.Key, br.Ops...)
254254
case *BatchDelete:
255-
policy := cmd.client.getUsableBatchDeletePolicy(br.Policy).toWritePolicy(cmd.policy)
255+
policy := cmd.client.getUsableBatchDeletePolicyWithConfig(br.Policy).toWritePolicyWithConfig(cmd.policy, client.dynConfig)
256256
res, err = client.Operate(policy, br.Key, DeleteOp())
257257
case *BatchUDF:
258-
policy := cmd.client.getUsableBatchUDFPolicy(br.Policy).toWritePolicy(cmd.policy)
258+
policy := cmd.client.getUsableBatchUDFPolicyWithConfig(br.Policy).toWritePolicyWithConfig(cmd.policy, client.dynConfig)
259259
policy.RespondPerEachOp = true
260260
res, err = client.execute(policy, br.Key, br.PackageName, br.FunctionName, br.FunctionArgs...)
261261
}

batch_command_udf.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func (cmd *batchCommandUDF) isRead() bool {
185185

186186
func (cmd *batchCommandUDF) executeSingle(client *Client) Error {
187187
for i, key := range cmd.keys {
188-
policy := cmd.batchUDFPolicy.toWritePolicy(cmd.policy)
188+
policy := cmd.batchUDFPolicy.toWritePolicyWithConfig(cmd.policy, client.dynConfig)
189189
policy.RespondPerEachOp = true
190190
res, err := client.execute(policy, key, cmd.packageName, cmd.functionName, cmd.args...)
191191
cmd.records[i].setRecord(res)

batch_delete_policy.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
package aerospike
1616

17+
import pc "github.com/aerospike/aerospike-client-go/v8/internal/cache"
18+
1719
// BatchDeletePolicy is used in batch delete commands.
1820
type BatchDeletePolicy struct {
1921
// FilterExpression is optional expression filter. If FilterExpression exists and evaluates to false, the specific batch key
@@ -58,6 +60,17 @@ func NewBatchDeletePolicy() *BatchDeletePolicy {
5860
}
5961
}
6062

63+
func NewBatchDeletePolicyOrDefaultFromCache(dynConfig *DynConfig) *BatchDeletePolicy {
64+
if dynConfig == nil {
65+
return NewBatchDeletePolicy()
66+
}
67+
68+
dynConfig.lock.RLock()
69+
defer dynConfig.lock.RUnlock()
70+
71+
return dynConfig.mappedPolicies.Get(pc.BATCH_DELETE_POLICY).(*BatchDeletePolicy)
72+
}
73+
6174
func (bdp *BatchDeletePolicy) toWritePolicy(bp *BatchPolicy) *WritePolicy {
6275
wp := bp.toWritePolicy()
6376

@@ -73,3 +86,103 @@ func (bdp *BatchDeletePolicy) toWritePolicy(bp *BatchPolicy) *WritePolicy {
7386
}
7487
return wp
7588
}
89+
90+
func (bdp *BatchDeletePolicy) toWritePolicyWithConfig(bp *BatchPolicy, dynConfig *DynConfig) *WritePolicy {
91+
wp := bp.toWritePolicy()
92+
93+
if bdp != nil {
94+
if bdp.FilterExpression != nil {
95+
wp.FilterExpression = bdp.FilterExpression
96+
}
97+
wp.CommitLevel = bdp.CommitLevel
98+
wp.GenerationPolicy = bdp.GenerationPolicy
99+
wp.Generation = bdp.Generation
100+
wp.DurableDelete = bdp.DurableDelete
101+
wp.SendKey = bdp.SendKey
102+
}
103+
104+
// In Case dynConfig is not initialized or running return the policy before
105+
// merge
106+
if dynConfig == nil {
107+
return wp
108+
}
109+
110+
dynConfig.lock.RLock()
111+
defer dynConfig.lock.RUnlock()
112+
113+
config := dynConfig.config
114+
if config != nil && config.Dynamic.BatchDelete != nil {
115+
if config.Dynamic.BatchDelete.DurableDelete != nil {
116+
wp.DurableDelete = *config.Dynamic.BatchDelete.DurableDelete
117+
}
118+
if config.Dynamic.BatchDelete.SendKey != nil {
119+
wp.SendKey = *config.Dynamic.BatchDelete.SendKey
120+
}
121+
}
122+
123+
return wp
124+
}
125+
126+
// copyBatchDeletePolicy creates a new BasePolicy instance and copies the values from the source BatchDeletePolicy.
127+
func copyBatchDeletePolicy(src *BatchDeletePolicy) *BatchDeletePolicy {
128+
if src == nil {
129+
return nil
130+
}
131+
132+
response := NewBatchDeletePolicy()
133+
134+
response.FilterExpression = src.FilterExpression
135+
response.FilterExpression = src.FilterExpression
136+
response.CommitLevel = src.CommitLevel
137+
response.GenerationPolicy = src.GenerationPolicy
138+
response.Generation = src.Generation
139+
response.DurableDelete = src.DurableDelete
140+
response.SendKey = src.SendKey
141+
142+
return response
143+
}
144+
145+
// applyConfigToBatchDeletePolicy applies the dynamic configuration and generates a new policy
146+
func applyConfigToBatchDeletePolicy(policy *BatchDeletePolicy, dynConfig *DynConfig) *BatchDeletePolicy {
147+
if dynConfig == nil {
148+
return policy
149+
}
150+
151+
config := dynConfig.getConfigIfNotInitialized()
152+
153+
dynConfig.lock.RLock()
154+
defer dynConfig.lock.RUnlock()
155+
156+
if policy == nil {
157+
// Passed in policy is nil, fetch mapped default policy from cache.
158+
return dynConfig.mappedPolicies.Get(pc.BATCH_DELETE_POLICY).(*BatchDeletePolicy)
159+
}
160+
if config != nil && config.Dynamic != nil && config.Dynamic.BatchDelete != nil {
161+
// Dynamic configuration is exists for policy in question.
162+
var responsePolicy *BatchDeletePolicy
163+
// User has provided a custom policy. We need to apply the dynamic configuration.
164+
responsePolicy = copyBatchDeletePolicy(policy)
165+
responsePolicy = mapDynamicBatchDeletePolicy(responsePolicy, dynConfig)
166+
167+
return responsePolicy
168+
} else {
169+
return policy
170+
}
171+
}
172+
173+
func mapDynamicBatchDeletePolicy(policy *BatchDeletePolicy, dynConfig *DynConfig) *BatchDeletePolicy {
174+
if dynConfig.config == nil && dynConfig.config.Dynamic == nil {
175+
return policy
176+
}
177+
178+
if dynConfig.config.Dynamic.BatchDelete != nil {
179+
if dynConfig.config.Dynamic.BatchDelete.DurableDelete != nil {
180+
policy.DurableDelete = *dynConfig.config.Dynamic.BatchDelete.DurableDelete
181+
}
182+
if dynConfig.config.Dynamic.BatchDelete.SendKey != nil {
183+
policy.SendKey = *dynConfig.config.Dynamic.BatchDelete.SendKey
184+
}
185+
}
186+
187+
return policy
188+
}

batch_delete_policy_config_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package aerospike
2+
3+
import (
4+
dynconfig "github.com/aerospike/aerospike-client-go/v8/config"
5+
. "github.com/onsi/ginkgo/v2"
6+
. "github.com/onsi/gomega"
7+
)
8+
9+
var _ = Describe("ApplyConfigToBatchDeletePolicy", func() {
10+
11+
Context("when applying full configuration to batch delete policy", func() {
12+
It("should update the policy values based on the dynamic config", func() {
13+
// Create the full configuration.
14+
config := &DynConfig{
15+
config: &dynconfig.Config{
16+
Dynamic: &dynconfig.DynamicConfig{
17+
BatchDelete: &dynconfig.BatchDelete{
18+
DurableDelete: func() *bool {
19+
r := true
20+
return &r
21+
}(),
22+
SendKey: func() *bool {
23+
r := true
24+
return &r
25+
}(),
26+
},
27+
},
28+
},
29+
}
30+
31+
// Create an initial BatchReadPolicy.
32+
policy := NewBatchDeletePolicy()
33+
34+
// Verify defaults.
35+
Expect(policy).NotTo(BeNil())
36+
Expect(policy.DurableDelete).To(BeFalse())
37+
Expect(policy.SendKey).To(BeFalse())
38+
39+
// Apply configuration.
40+
updatedPolicy := applyConfigToBatchDeletePolicy(policy, config)
41+
42+
// Validate applied configuration.
43+
Expect(updatedPolicy).NotTo(BeNil())
44+
Expect(updatedPolicy.DurableDelete).To(BeTrue())
45+
Expect(updatedPolicy.DurableDelete).To(BeTrue())
46+
})
47+
})
48+
49+
Context("when applying batch read config to a write policy", func() {
50+
It("should update the write policy values based on the batch delete dynamic config", func() {
51+
// Create the full configuration.
52+
config := &DynConfig{
53+
config: &dynconfig.Config{
54+
Dynamic: &dynconfig.DynamicConfig{
55+
BatchDelete: &dynconfig.BatchDelete{
56+
DurableDelete: func() *bool {
57+
r := true
58+
return &r
59+
}(),
60+
SendKey: func() *bool {
61+
r := true
62+
return &r
63+
}(),
64+
},
65+
},
66+
},
67+
}
68+
69+
// Create an initial BatchPolicy (used for write operations).
70+
batchPolicy := NewBatchPolicy()
71+
72+
// Verify defaults.
73+
Expect(batchPolicy).NotTo(BeNil())
74+
Expect(batchPolicy.ReadModeAP).To(Equal(ReadModeAPOne))
75+
Expect(batchPolicy.ReadModeSC).To(Equal(ReadModeSCSession))
76+
Expect(batchPolicy.ReadTouchTTLPercent).To(Equal(int32(0)))
77+
78+
batchDeletePolicy := NewBatchDeletePolicy()
79+
updatedWritePolicy := batchDeletePolicy.toWritePolicyWithConfig(batchPolicy, config)
80+
81+
// Validate applied configuration.
82+
Expect(updatedWritePolicy).NotTo(BeNil())
83+
Expect(updatedWritePolicy.DurableDelete).To(BeTrue())
84+
Expect(updatedWritePolicy.SendKey).To(BeTrue())
85+
})
86+
})
87+
})

batch_policy.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414

1515
package aerospike
1616

17+
import (
18+
"time"
19+
20+
pc "github.com/aerospike/aerospike-client-go/v8/internal/cache"
21+
)
22+
1723
// BatchPolicy encapsulates parameters for policy attributes used in write operations.
1824
// This object is passed into methods where database writes can occur.
1925
type BatchPolicy struct {
@@ -101,6 +107,17 @@ func NewBatchPolicy() *BatchPolicy {
101107
}
102108
}
103109

110+
func NewBatchPolicyOrDefaultFromCache(dynConfig *DynConfig) *BatchPolicy {
111+
if dynConfig == nil {
112+
return NewBatchPolicy()
113+
}
114+
115+
dynConfig.lock.RLock()
116+
defer dynConfig.lock.RUnlock()
117+
118+
return dynConfig.mappedPolicies.Get(pc.BATCH_POLICY).(*BatchPolicy)
119+
}
120+
104121
// NewReadBatchPolicy initializes a new BatchPolicy instance for reads.
105122
func NewReadBatchPolicy() *BatchPolicy {
106123
return NewBatchPolicy()
@@ -120,3 +137,99 @@ func (p *BatchPolicy) toWritePolicy() *WritePolicy {
120137
}
121138
return wp
122139
}
140+
141+
// copyQueryPolicy creates a new BasePolicy instance and copies the values from the source BasePolicy.
142+
func copyBatchPolicy(src *BatchPolicy) *BatchPolicy {
143+
if src == nil {
144+
return nil
145+
}
146+
147+
response := NewBatchPolicy()
148+
149+
response.Txn = src.Txn
150+
response.FilterExpression = src.FilterExpression
151+
response.ReadModeAP = src.ReadModeAP
152+
response.ReadModeSC = src.ReadModeSC
153+
response.TotalTimeout = src.TotalTimeout
154+
response.SocketTimeout = src.SocketTimeout
155+
response.MaxRetries = src.MaxRetries
156+
response.ReadTouchTTLPercent = src.ReadTouchTTLPercent
157+
response.SleepBetweenRetries = src.SleepBetweenRetries
158+
response.SleepMultiplier = src.SleepMultiplier
159+
response.ExitFastOnExhaustedConnectionPool = src.ExitFastOnExhaustedConnectionPool
160+
response.SendKey = src.SendKey
161+
response.UseCompression = src.UseCompression
162+
response.ReplicaPolicy = src.ReplicaPolicy
163+
response.ConcurrentNodes = src.ConcurrentNodes
164+
response.AllowInline = src.AllowInline
165+
response.AllowInlineSSD = src.AllowInlineSSD
166+
response.RespondAllKeys = src.RespondAllKeys
167+
response.AllowPartialResults = src.AllowPartialResults
168+
169+
return response
170+
}
171+
172+
// applyConfigToQueryPolicy applies the dynamic configuration and generates a new policy
173+
func applyConfigToBatchPolicy(policy *BatchPolicy, dynConfig *DynConfig) *BatchPolicy {
174+
if dynConfig == nil {
175+
return policy
176+
}
177+
178+
config := dynConfig.getConfigIfNotInitialized()
179+
180+
dynConfig.lock.RLock()
181+
defer dynConfig.lock.RUnlock()
182+
183+
if policy == nil {
184+
// Passed in policy is nil, fetch mapped default policy from cache.
185+
return dynConfig.mappedPolicies.Get(pc.BATCH_POLICY).(*BatchPolicy)
186+
} else if config != nil && config.Dynamic != nil && config.Dynamic.BatchRead != nil {
187+
// Dynamic configuration exists for policy in question.
188+
var responsePolicy *BatchPolicy
189+
// User has provided a custom policy. We need to apply the dynamic configuration.
190+
responsePolicy = copyBatchPolicy(policy)
191+
responsePolicy = mapDynamicBatchPolicy(responsePolicy, dynConfig)
192+
193+
return responsePolicy
194+
} else {
195+
return policy
196+
}
197+
}
198+
199+
func mapDynamicBatchPolicy(policy *BatchPolicy, dynConfig *DynConfig) *BatchPolicy {
200+
if dynConfig.config == nil && dynConfig.config.Dynamic == nil {
201+
return policy
202+
}
203+
204+
if dynConfig.config.Dynamic.BatchRead != nil {
205+
if dynConfig.config.Dynamic.BatchRead.ReadModeAp != nil {
206+
policy.ReadModeAP = mapReadModeAPToReadModeAP(*dynConfig.config.Dynamic.BatchRead.ReadModeAp)
207+
}
208+
if dynConfig.config.Dynamic.BatchRead.ReadModeSc != nil {
209+
policy.ReadModeSC = mapReadModeSCToReadModeSC(*dynConfig.config.Dynamic.BatchRead.ReadModeSc)
210+
}
211+
if dynConfig.config.Dynamic.BatchRead.TotalTimeout != nil {
212+
policy.TotalTimeout = time.Duration(*dynConfig.config.Dynamic.BatchRead.TotalTimeout)
213+
}
214+
if dynConfig.config.Dynamic.BatchRead.SocketTimeout != nil {
215+
policy.SocketTimeout = time.Duration(*dynConfig.config.Dynamic.BatchRead.SocketTimeout)
216+
}
217+
if dynConfig.config.Dynamic.BatchRead.MaxRetries != nil {
218+
policy.MaxRetries = *dynConfig.config.Dynamic.BatchRead.MaxRetries
219+
}
220+
if dynConfig.config.Dynamic.BatchRead.SleepBetweenRetries != nil {
221+
policy.SleepBetweenRetries = time.Duration(*dynConfig.config.Dynamic.BatchRead.SleepBetweenRetries)
222+
}
223+
if dynConfig.config.Dynamic.BatchRead.AllowInline != nil {
224+
policy.AllowInline = *dynConfig.config.Dynamic.BatchRead.AllowInline
225+
}
226+
if dynConfig.config.Dynamic.BatchRead.AllowInlineSSD != nil {
227+
policy.AllowInlineSSD = *dynConfig.config.Dynamic.BatchRead.AllowInlineSSD
228+
}
229+
if dynConfig.config.Dynamic.BatchRead.RespondAllKeys != nil {
230+
policy.RespondAllKeys = *dynConfig.config.Dynamic.BatchRead.RespondAllKeys
231+
}
232+
}
233+
234+
return policy
235+
}

0 commit comments

Comments
 (0)