Skip to content

Commit

Permalink
Merge pull request #135 from aerospike/stage
Browse files Browse the repository at this point in the history
Ruby Client v4.1.0
  • Loading branch information
khaf authored Oct 22, 2024
2 parents 444508d + 2b5df7c commit fb54d1d
Show file tree
Hide file tree
Showing 18 changed files with 335 additions and 119 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@

All notable changes to this project will be documented in this file.

## [4.1.0] 2024-10-22

- **New Features**
- [CLIENT-2833] Support `Policy#ReadTouchTtlPercent`.
- [CLIENT-2826] Support `QueryDuration` in `QueryPolicy#ExpectedDuration`.
- [CLIENT-3103] Support `XDR_KEY_BUSY`.

- **Fixes**
- [CLIENT-3144] Various fixes. PR #132 and #133 Thanks to [Igor Pstyga](https://github.com/opti)
- Fix `BatchRead` for multiple records with operations.
- Use correct namespace for the `MapReturnType`.
- `BatchRead` with operations would throw an exception.
- Fix a test with invalid map key in Server v7.1.

## [4.0.0] 2024-08-14

- **New Features**
Expand Down
1 change: 1 addition & 0 deletions lib/aerospike.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
require "aerospike/geo_json"
require "aerospike/ttl"

require "aerospike/policy/query_duration"
require "aerospike/policy/client_policy"
require "aerospike/policy/priority"
require "aerospike/policy/record_exists_action"
Expand Down
4 changes: 2 additions & 2 deletions lib/aerospike/batch_attr.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def set_read(rp)
@write_attr = 0
@info_attr = 0

@expiration = 0
@expiration = rp.read_touch_ttl_percent
@generation = 0
@has_write = false
@send_key = false
Expand All @@ -88,7 +88,7 @@ def set_batch_read(rp)
@write_attr = 0
@info_attr = 0

@expiration = 0
@expiration = rp.read_touch_ttl_percent
@generation = 0
@has_write = false
@send_key = false
Expand Down
4 changes: 2 additions & 2 deletions lib/aerospike/batch_read.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def self.ops(key, ops, opt = {})
# For internal use only.
def ==(other) # :nodoc:
other && other.instance_of?(self.class) &&
@bin_names.sort == other.bin_names.sort && @ops.sort == other.ops.sort &&
@bin_names&.sort == other.bin_names&.sort && @ops == other.ops &&
@policy == other.policy && @read_all_bins == other.read_all_bins
end

Expand All @@ -88,7 +88,7 @@ def size # :nodoc:
raise AerospikeException.new(ResultCode::PARAMETER_ERROR, "Write operations not allowed in batch read")
end
size += op.bin_name.bytesize + Aerospike::OPERATION_HEADER_SIZE
size += op.value.estimate_size
size += op.bin_value.estimate_size
end

size
Expand Down
2 changes: 1 addition & 1 deletion lib/aerospike/command/batch_index_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def write_buffer
end
end
size_buffer
write_header_read(policy, read_attr | INFO1_BATCH, 0, field_count, 0)
write_header_read(policy, read_attr | INFO1_BATCH, 0, 0, field_count, 0)

write_filter_exp(@policy.filter_exp, exp_size)

Expand Down
2 changes: 1 addition & 1 deletion lib/aerospike/command/batch_operate_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def write_buffer
if record.bin_names&.length&.> 0
write_batch_bin_names(key, record.bin_names, attr, attr.filter_exp)
elsif record.ops&.length&.> 0
attr.adjust_read(br.ops)
attr.adjust_read(record.ops)
write_batch_operations(key, record.ops, attr, attr.filter_exp)
else
attr.adjust_read_all_bins(record.read_all_bins)
Expand Down
39 changes: 26 additions & 13 deletions lib/aerospike/command/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ module Aerospike
INFO2_DURABLE_DELETE = Integer(1 << 4)
# Create only. Fail if record already exists.
INFO2_CREATE_ONLY = Integer(1 << 5)

# Treat as long query, but relax read consistency.
INFO2_RELAX_AP_LONG_QUERY = (1 << 6)
# Return a result for every operation.
INFO2_RESPOND_ALL_OPS = Integer(1 << 7)

Expand Down Expand Up @@ -195,7 +196,7 @@ def set_read_for_key_only(policy, key)
field_count += 1 if exp_size > 0

size_buffer
write_header_read(policy, INFO1_READ | INFO1_GET_ALL, 0, field_count, 0)
write_header_read(policy, INFO1_READ | INFO1_GET_ALL, 0, 0, field_count, 0)
write_key(key)
write_filter_exp(@policy.filter_exp, exp_size)
end_cmd
Expand All @@ -220,7 +221,7 @@ def set_read(policy, key, bin_names)
attr |= INFO1_GET_ALL
end

write_header_read(policy, attr, 0, field_count, bin_names.length)
write_header_read(policy, attr, 0, 0, field_count, bin_names.length)
write_key(key)
write_filter_exp(@policy.filter_exp, exp_size)

Expand Down Expand Up @@ -269,7 +270,7 @@ def set_operate(policy, key, args)

size_buffer

write_header_read_write(policy, args.read_attr, args.write_attr, field_count, args.operations.length)
write_header_read_write(policy, args, field_count)
write_key(key, policy)
write_filter_exp(policy.filter_exp, exp_size)

Expand Down Expand Up @@ -377,7 +378,7 @@ def set_scan(cluster, policy, namespace, set_name, bin_names, node_partitions)
operation_count = bin_names.length
end

write_header_read(policy, read_attr, info_attr, field_count, operation_count)
write_header_read(policy, read_attr, 0, info_attr, field_count, operation_count)

if namespace
write_field_string(namespace, Aerospike::FieldType::NAMESPACE)
Expand Down Expand Up @@ -591,10 +592,16 @@ def set_query(cluster, policy, statement, background, node_partitions)
write_header_write(policy, INFO2_WRITE, field_count, operation_count)
else
read_attr = INFO1_READ
write_attr = 0

read_attr |= INFO1_NOBINDATA unless policy.include_bin_data
read_attr |= INFO1_SHORT_QUERY if policy.short_query
if policy.short_query || policy.expected_duration == QueryDuration::SHORT
read_attr |= INFO1_SHORT_QUERY
elsif policy.expected_duration == QueryDuration::LONG_RELAX_AP
write_attr |= INFO2_RELAX_AP_LONG_QUERY
end
info_attr = INFO3_PARTITION_DONE if is_new
write_header_read(policy, read_attr, info_attr, field_count, operation_count)
write_header_read(policy, read_attr, write_attr, info_attr, field_count, operation_count)
end


Expand Down Expand Up @@ -903,10 +910,14 @@ def write_header_write(policy, write_attr, field_count, operation_count)
end

# Header write for write operations.
def write_header_read_write(policy, read_attr, write_attr, field_count, operation_count)
def write_header_read_write(policy, args, field_count)
# Set flags.
generation = Integer(0)
ttl = args.has_write ? policy.expiration : policy.read_touch_ttl_percent
read_attr = args.read_attr
write_attr = args.write_attr
info_attr = Integer(0)
operation_count = args.operations.length

case policy.record_exists_action
when Aerospike::RecordExistsAction::UPDATE
Expand Down Expand Up @@ -942,7 +953,7 @@ def write_header_read_write(policy, read_attr, write_attr, field_count, operatio
@data_buffer.write_byte(0, 12) # unused
@data_buffer.write_byte(0, 13) # clear the result code
@data_buffer.write_uint32(generation, 14)
@data_buffer.write_uint32(policy.ttl, 18)
@data_buffer.write_uint32(ttl, 18)

# Initialize timeout. It will be written later.
@data_buffer.write_byte(0, 22)
Expand All @@ -956,18 +967,19 @@ def write_header_read_write(policy, read_attr, write_attr, field_count, operatio
@data_offset = MSG_TOTAL_HEADER_SIZE
end

def write_header_read(policy, read_attr, info_attr, field_count, operation_count)
def write_header_read(policy, read_attr, write_attr, info_attr, field_count, operation_count)
read_attr |= INFO1_COMPRESS_RESPONSE if policy.use_compression
#TODO: Add SC Mode

@data_buffer.write_byte(MSG_REMAINING_HEADER_SIZE, 8) # Message header.length.
@data_buffer.write_byte(read_attr, 9)
@data_buffer.write_byte(0, 10)
@data_buffer.write_byte(write_attr, 10)
@data_buffer.write_byte(info_attr, 11)

(12...22).each { |i| @data_buffer.write_byte(0, i) }
(12...18).each { |i| @data_buffer.write_byte(0, i) }

# Initialize timeout. It will be written later.
@data_buffer.write_int32(policy.read_touch_ttl_percent, 18)
@data_buffer.write_byte(0, 22)
@data_buffer.write_byte(0, 23)
@data_buffer.write_byte(0, 24)
Expand All @@ -988,9 +1000,10 @@ def write_header_read_header(policy, read_attr, field_count, operation_count)
@data_buffer.write_byte(0, 10)
@data_buffer.write_byte(info_attr, 11)

(12...22).each { |i| @data_buffer.write_byte(0, i) }
(12...18).each { |i| @data_buffer.write_byte(0, i) }

# Initialize timeout. It will be written later.
@data_buffer.write_int32(policy.read_touch_ttl_percent, 18)
@data_buffer.write_byte(0, 22)
@data_buffer.write_byte(0, 23)
@data_buffer.write_byte(0, 24)
Expand Down
10 changes: 5 additions & 5 deletions lib/aerospike/exp/exp_map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -476,21 +476,21 @@ def self.add_read(bin, bytes, ret_type)
def self.get_value_type(return_type)
t = return_type & ~CDT::MapReturnType::INVERTED
case t
when MapReturnType::INDEX, MapReturnType::REVERSE_INDEX, MapReturnType::RANK, MapReturnType::REVERSE_RANK
when CDT::MapReturnType::INDEX, CDT::MapReturnType::REVERSE_INDEX, CDT::MapReturnType::RANK, CDT::MapReturnType::REVERSE_RANK
# This method only called from expressions that can return multiple integers (ie list).
Exp::Type::LIST

when MapReturnType::COUNT
when CDT::MapReturnType::COUNT
Exp::Type::INT

when MapReturnType::KEY, MapReturnType::VALUE
when CDT::MapReturnType::KEY, CDT::MapReturnType::VALUE
# This method only called from expressions that can return multiple objects (ie list).
Exp::Type::LIST

when MapReturnType::KEY_VALUE, MapReturnType::ORDERED_MAP, MapReturnType::UNORDERED_MAP
when CDT::MapReturnType::KEY_VALUE, CDT::MapReturnType::ORDERED_MAP, CDT::MapReturnType::UNORDERED_MAP
Exp::Type::MAP

when MapReturnType::EXISTS
when CDT::MapReturnType::EXISTS
Exp::Type::BOOL

else
Expand Down
19 changes: 18 additions & 1 deletion lib/aerospike/policy/batch_read_policy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ module Aerospike
# Policy attributes used in batch read commands.
class BatchReadPolicy

attr_accessor :filter_exp
attr_accessor :filter_exp, :read_touch_ttl_percent

def initialize(opt={})
# Optional expression filter. If filter_exp exists and evaluates to false, the specific batch key
Expand All @@ -33,6 +33,23 @@ def initialize(opt={})
#
# Default: nil
@filter_exp = opt[:filter_exp]

# Determines how record TTL (time to live) is affected on reads. When enabled, the server can
# efficiently operate as a read-based LRU cache where the least recently used records are expired.
# The value is expressed as a percentage of the TTL sent on the most recent write such that a read
# within this interval of the record’s end of life will generate a touch.
#
# For example, if the most recent write had a TTL of 10 hours and read_touch_ttl_percent is set to
# 80, the next read within 8 hours of the record's end of life (equivalent to 2 hours after the most
# recent write) will result in a touch, resetting the TTL to another 10 hours.
#
# Values:
#
# 0 : Use server config default-read-touch-ttl-pct for the record's namespace/set.
# -1 : Do not reset record TTL on reads.
# 1 - 100 : Reset record TTL on reads when within this percentage of the most recent write TTL.
# Default: 0
@read_touch_ttl_percent = opt[:read_touch_ttl_percent] || 0
end
end
end
19 changes: 18 additions & 1 deletion lib/aerospike/policy/policy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ module Aerospike
# Container object for client policy command.
class Policy
attr_accessor :filter_exp, :priority, :timeout, :max_retries, :sleep_between_retries, :consistency_level,
:fail_on_filtered_out, :replica, :use_compression, :socket_timeout
:fail_on_filtered_out, :replica, :use_compression, :socket_timeout, :read_touch_ttl_percent

alias total_timeout timeout
alias total_timeout= timeout=
Expand Down Expand Up @@ -95,6 +95,23 @@ def initialize(opt = {})
# has not yet been exceeded.
@max_retries = opt[:max_retries] || 2

# Determines how record TTL (time to live) is affected on reads. When enabled, the server can
# efficiently operate as a read-based LRU cache where the least recently used records are expired.
# The value is expressed as a percentage of the TTL sent on the most recent write such that a read
# within this interval of the record’s end of life will generate a touch.
#
# For example, if the most recent write had a TTL of 10 hours and read_touch_ttl_percent is set to
# 80, the next read within 8 hours of the record's end of life (equivalent to 2 hours after the most
# recent write) will result in a touch, resetting the TTL to another 10 hours.
#
# Values:
#
# 0 : Use server config default-read-touch-ttl-pct for the record's namespace/set.
# -1 : Do not reset record TTL on reads.
# 1 - 100 : Reset record TTL on reads when within this percentage of the most recent write TTL.
# Default: 0
@read_touch_ttl_percent = opt[:read_touch_ttl_percent] || 0

# Duration to sleep between retries if a transaction fails and the
# timeout was not exceeded. Enter zero to skip sleep.
@sleep_between_retries = opt[:sleep_between_retries] || 0.5
Expand Down
48 changes: 48 additions & 0 deletions lib/aerospike/policy/query_duration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# encoding: utf-8
# Copyright 2014-2024 Aerospike, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Aerospike

# Defines the expected query duration. The server treats the query in different ways depending on the expected duration.
# This enum is ignored for aggregation queries, background queries and server versions < 6.0.
module QueryDuration

# The query is expected to return more than 100 records per node. The server optimizes for a large record set in
# the following ways:
#
# Allow query to be run in multiple threads using the server's query threading configuration.
# Do not relax read consistency for AP namespaces.
# Add the query to the server's query monitor.
# Do not add the overall latency to the server's latency histogram.
# Do not allow server timeouts.
LONG = 0

# The query is expected to return less than 100 records per node. The server optimizes for a small record set in
# the following ways:
# Always run the query in one thread and ignore the server's query threading configuration.
# Allow query to be inlined directly on the server's service thread.
# Relax read consistency for AP namespaces.
# Do not add the query to the server's query monitor.
# Add the overall latency to the server's latency histogram.
# Allow server timeouts. The default server timeout for a short query is 1 second.
SHORT = 1

# Treat query as a LONG query, but relax read consistency for AP namespaces.
# This value is treated exactly like LONG for server versions < 7.1.
LONG_RELAX_AP = 2

end # module

end # module
21 changes: 13 additions & 8 deletions lib/aerospike/policy/query_policy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,18 @@
# License for the specific language governing permissions and limitations under
# the License.

require 'aerospike/policy/query_duration'
require 'aerospike/policy/policy'

module Aerospike

# Container object for query policy command.
class QueryPolicy < Policy

attr_accessor :concurrent_nodes
attr_accessor :max_records
attr_accessor :include_bin_data
attr_accessor :record_queue_size
attr_accessor :records_per_second
attr_accessor :socket_timeout
attr_accessor :short_query
attr_accessor :concurrent_nodes, :max_records, :include_bin_data, :record_queue_size, :records_per_second, :socket_timeout, :short_query, :expected_duration

def initialize(opt={})
super(opt)
super

# Indicates if bin data is retrieved. If false, only record digests (and
# user keys if stored on the server) are retrieved.
Expand Down Expand Up @@ -74,11 +69,21 @@ def initialize(opt={})
# Default is 0
@records_per_second = opt[:records_per_second] || 0

# Expected query duration. The server treats the query in different ways depending on the expected duration.
# This field is ignored for aggregation queries, background queries and server versions < 6.0.
#
# Default: QueryDuration::LONG
@expected_duration = opt[:expected_duration] || QueryDuration::LONG

# DEPRECATED
# Detemine wether query expected to return less than 100 records.
# If true, the server will optimize the query for a small record set.
# This field is ignored for aggregation queries, background queries
# and server versions 6.0+.
#
# This field is deprecated and will eventually be removed. Use {expected_duration} instead.
# For backwards compatibility: If ShortQuery is true, the query is treated as a short query and
# {expected_duration} is ignored. If {short_query} is false, {expected_duration} is used as defaults to {Policy#QueryDuration#LONG}.
# Default: false
@short_query = opt[:short_query] ||false

Expand Down
Loading

0 comments on commit fb54d1d

Please sign in to comment.