Skip to content

CLIENT-3365 Dynamic config changes #479

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: stage
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ docker-compose.yml
golangci.yml
cover*.out
.vscode/settings.json
ginkgo.report
2 changes: 1 addition & 1 deletion batch_command_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (cmd *batchCommandDelete) commandType() commandType {
}

func (cmd *batchCommandDelete) executeSingle(client *Client) Error {
policy := cmd.batchDeletePolicy.toWritePolicy(cmd.policy)
policy := cmd.batchDeletePolicy.toWritePolicy(cmd.policy, client.dynConfig)
for i, key := range cmd.keys {
res, err := client.Operate(policy, key, DeleteOp())
cmd.records[i].setRecord(res)
Expand Down
8 changes: 4 additions & 4 deletions batch_command_operate.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,16 +246,16 @@ func (cmd *batchCommandOperate) executeSingle(client *Client) Error {
} else if len(ops) == 0 {
ops = append(ops, GetOp())
}
res, err = client.Operate(cmd.client.getUsableBatchReadPolicy(br.Policy).toWritePolicy(cmd.policy), br.Key, ops...)
res, err = client.Operate(cmd.client.getUsableBatchReadPolicy(br.Policy).ToWritePolicy(cmd.policy, client.dynConfig), br.Key, ops...)
case *BatchWrite:
policy := cmd.client.getUsableBatchWritePolicy(br.Policy).toWritePolicy(cmd.policy)
policy := cmd.client.getUsableBatchWritePolicy(br.Policy).toWritePolicy(cmd.policy, client.dynConfig)
policy.RespondPerEachOp = true
res, err = client.Operate(policy, br.Key, br.Ops...)
case *BatchDelete:
policy := cmd.client.getUsableBatchDeletePolicy(br.Policy).toWritePolicy(cmd.policy)
policy := cmd.client.getUsableBatchDeletePolicy(br.Policy).toWritePolicy(cmd.policy, client.dynConfig)
res, err = client.Operate(policy, br.Key, DeleteOp())
case *BatchUDF:
policy := cmd.client.getUsableBatchUDFPolicy(br.Policy).toWritePolicy(cmd.policy)
policy := cmd.client.getUsableBatchUDFPolicy(br.Policy).toWritePolicy(cmd.policy, client.dynConfig)
policy.RespondPerEachOp = true
res, err = client.execute(policy, br.Key, br.PackageName, br.FunctionName, br.FunctionArgs...)
}
Expand Down
2 changes: 1 addition & 1 deletion batch_command_udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (cmd *batchCommandUDF) isRead() bool {

func (cmd *batchCommandUDF) executeSingle(client *Client) Error {
for i, key := range cmd.keys {
policy := cmd.batchUDFPolicy.toWritePolicy(cmd.policy)
policy := cmd.batchUDFPolicy.toWritePolicy(cmd.policy, client.dynConfig)
policy.RespondPerEachOp = true
res, err := client.execute(policy, key, cmd.packageName, cmd.functionName, cmd.args...)
cmd.records[i].setRecord(res)
Expand Down
75 changes: 74 additions & 1 deletion batch_delete_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,15 @@ func NewBatchDeletePolicy() *BatchDeletePolicy {
}
}

func (bdp *BatchDeletePolicy) toWritePolicy(bp *BatchPolicy) *WritePolicy {
func NewDynamicBatchDeletePolicy(dynConfig *DynConfig) *BatchDeletePolicy {
if dynConfig == nil {
return NewBatchDeletePolicy()
}

return dynConfig.client.dynDefaultBatchDeletePolicy.Load()
}

func (bdp *BatchDeletePolicy) toWritePolicy(bp *BatchPolicy, dynConfig *DynConfig) *WritePolicy {
wp := bp.toWritePolicy()

if bdp != nil {
Expand All @@ -71,5 +79,70 @@ func (bdp *BatchDeletePolicy) toWritePolicy(bp *BatchPolicy) *WritePolicy {
wp.DurableDelete = bdp.DurableDelete
wp.SendKey = bdp.SendKey
}

// In Case dynConfig is not initialized or running return the policy before
// merge
if dynConfig == nil {
return wp
}

config := dynConfig.config
if config != nil && config.Dynamic.BatchDelete != nil {
if config.Dynamic.BatchDelete.DurableDelete != nil {
wp.DurableDelete = *config.Dynamic.BatchDelete.DurableDelete
}
if config.Dynamic.BatchDelete.SendKey != nil {
wp.SendKey = *config.Dynamic.BatchDelete.SendKey
}
}

return wp
}

// copy creates a new BasePolicy instance and copies the values from the source BatchDeletePolicy.
func (bd *BatchDeletePolicy) copy() *BatchDeletePolicy {
if bd == nil {
return nil
}

response := *bd
return &response
}

// patchDynamic applies the dynamic configuration and generates a new policy
func (bdp *BatchDeletePolicy) patchDynamic(dynConfig *DynConfig) *BatchDeletePolicy {
if dynConfig == nil {
return bdp
}

config := dynConfig.getConfigIfNotLoadedOrInitialized()

if bdp == nil {
// Passed in policy is nil, fetch mapped default policy from cache.
return dynConfig.client.dynDefaultBatchDeletePolicy.Load()
}
if config != nil && config.Dynamic != nil && config.Dynamic.BatchDelete != nil {
// Dynamic configuration is exists for policy in question.
// User has provided a custom policy. We need to apply the dynamic configuration.
return bdp.copy().mapDynamic(dynConfig)
} else {
return bdp
}
}

func (bdp *BatchDeletePolicy) mapDynamic(dynConfig *DynConfig) *BatchDeletePolicy {
if dynConfig.config == nil || dynConfig.config.Dynamic == nil {
return bdp
}

if dynConfig.config.Dynamic.BatchDelete != nil {
if dynConfig.config.Dynamic.BatchDelete.DurableDelete != nil {
bdp.DurableDelete = *dynConfig.config.Dynamic.BatchDelete.DurableDelete
}
if dynConfig.config.Dynamic.BatchDelete.SendKey != nil {
bdp.SendKey = *dynConfig.config.Dynamic.BatchDelete.SendKey
}
}

return bdp
}
101 changes: 101 additions & 0 deletions batch_delete_policy_config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2014-2022 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aerospike

import (
dynconfig "github.com/aerospike/aerospike-client-go/v8/config"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("ApplyConfigToBatchDeletePolicy", func() {

Context("when applying full configuration to batch delete policy", func() {
It("should update the policy values based on the dynamic config", func() {
// Create the full configuration.
config := &DynConfig{
config: &dynconfig.Config{
Dynamic: &dynconfig.DynamicConfig{
BatchDelete: &dynconfig.BatchDelete{
DurableDelete: func() *bool {
r := true
return &r
}(),
SendKey: func() *bool {
r := true
return &r
}(),
},
},
},
}

// Create an initial BatchReadPolicy.
policy := NewBatchDeletePolicy()

// Verify defaults.
Expect(policy).NotTo(BeNil())
Expect(policy.DurableDelete).To(BeFalse())
Expect(policy.SendKey).To(BeFalse())

// Apply configuration.
updatedPolicy := policy.patchDynamic(config)

// Validate applied configuration.
Expect(updatedPolicy).NotTo(BeNil())
Expect(updatedPolicy.DurableDelete).To(BeTrue())
Expect(updatedPolicy.DurableDelete).To(BeTrue())
})
})

Context("when applying batch read config to a write policy", func() {
It("should update the write policy values based on the batch delete dynamic config", func() {
// Create the full configuration.
config := &DynConfig{
config: &dynconfig.Config{
Dynamic: &dynconfig.DynamicConfig{
BatchDelete: &dynconfig.BatchDelete{
DurableDelete: func() *bool {
r := true
return &r
}(),
SendKey: func() *bool {
r := true
return &r
}(),
},
},
},
}

// Create an initial BatchPolicy (used for write operations).
batchPolicy := NewBatchPolicy()

// Verify defaults.
Expect(batchPolicy).NotTo(BeNil())
Expect(batchPolicy.ReadModeAP).To(Equal(ReadModeAPOne))
Expect(batchPolicy.ReadModeSC).To(Equal(ReadModeSCSession))
Expect(batchPolicy.ReadTouchTTLPercent).To(Equal(int32(0)))

batchDeletePolicy := NewBatchDeletePolicy()
updatedWritePolicy := batchDeletePolicy.toWritePolicy(batchPolicy, config)

// Validate applied configuration.
Expect(updatedWritePolicy).NotTo(BeNil())
Expect(updatedWritePolicy.DurableDelete).To(BeTrue())
Expect(updatedWritePolicy.SendKey).To(BeTrue())
})
})
})
84 changes: 84 additions & 0 deletions batch_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

package aerospike

import (
"time"
)

// BatchPolicy encapsulates parameters for policy attributes used in write operations.
// This object is passed into methods where database writes can occur.
type BatchPolicy struct {
Expand Down Expand Up @@ -101,6 +105,14 @@ func NewBatchPolicy() *BatchPolicy {
}
}

func NewBatchPolicyOrDefaultFromCache(dynConfig *DynConfig) *BatchPolicy {
if dynConfig == nil {
return NewBatchPolicy()
}

return dynConfig.client.dynDefaultBatchPolicy.Load()
}

// NewReadBatchPolicy initializes a new BatchPolicy instance for reads.
func NewReadBatchPolicy() *BatchPolicy {
return NewBatchPolicy()
Expand All @@ -120,3 +132,75 @@ func (p *BatchPolicy) toWritePolicy() *WritePolicy {
}
return wp
}

// copyQueryPolicy creates a new BasePolicy instance and copies the values from the source BasePolicy.
func copyBatchPolicy(src *BatchPolicy) *BatchPolicy {
if src == nil {
return nil
}

response := *src
return &response
}

// applyConfigToQueryPolicy applies the dynamic configuration and generates a new policy
func applyConfigToBatchPolicy(policy *BatchPolicy, dynConfig *DynConfig) *BatchPolicy {
if dynConfig == nil {
return policy
}

config := dynConfig.getConfigIfNotLoadedOrInitialized()

if policy == nil {
// Passed in policy is nil, fetch mapped default policy from cache.
return dynConfig.client.dynDefaultBatchPolicy.Load()
} else if config != nil && config.Dynamic != nil && config.Dynamic.BatchRead != nil {
// Dynamic configuration exists for policy in question.
var responsePolicy *BatchPolicy
// User has provided a custom policy. We need to apply the dynamic configuration.
responsePolicy = copyBatchPolicy(policy)
responsePolicy = mapDynamicBatchPolicy(responsePolicy, dynConfig)

return responsePolicy
} else {
return policy
}
}

func mapDynamicBatchPolicy(policy *BatchPolicy, dynConfig *DynConfig) *BatchPolicy {
if dynConfig.config == nil || dynConfig.config.Dynamic == nil {
return policy
}

if dynConfig.config.Dynamic.BatchRead != nil {
if dynConfig.config.Dynamic.BatchRead.ReadModeAp != nil {
policy.ReadModeAP = mapReadModeAPToReadModeAP(*dynConfig.config.Dynamic.BatchRead.ReadModeAp)
}
if dynConfig.config.Dynamic.BatchRead.ReadModeSc != nil {
policy.ReadModeSC = mapReadModeSCToReadModeSC(*dynConfig.config.Dynamic.BatchRead.ReadModeSc)
}
if dynConfig.config.Dynamic.BatchRead.TotalTimeout != nil {
policy.TotalTimeout = time.Duration(*dynConfig.config.Dynamic.BatchRead.TotalTimeout) * time.Millisecond
}
if dynConfig.config.Dynamic.BatchRead.SocketTimeout != nil {
policy.SocketTimeout = time.Duration(*dynConfig.config.Dynamic.BatchRead.SocketTimeout) * time.Millisecond
}
if dynConfig.config.Dynamic.BatchRead.MaxRetries != nil {
policy.MaxRetries = *dynConfig.config.Dynamic.BatchRead.MaxRetries
}
if dynConfig.config.Dynamic.BatchRead.SleepBetweenRetries != nil {
policy.SleepBetweenRetries = time.Duration(*dynConfig.config.Dynamic.BatchRead.SleepBetweenRetries) * time.Millisecond
}
if dynConfig.config.Dynamic.BatchRead.AllowInline != nil {
policy.AllowInline = *dynConfig.config.Dynamic.BatchRead.AllowInline
}
if dynConfig.config.Dynamic.BatchRead.AllowInlineSSD != nil {
policy.AllowInlineSSD = *dynConfig.config.Dynamic.BatchRead.AllowInlineSSD
}
if dynConfig.config.Dynamic.BatchRead.RespondAllKeys != nil {
policy.RespondAllKeys = *dynConfig.config.Dynamic.BatchRead.RespondAllKeys
}
}

return policy
}
Loading