Skip to content

Commit

Permalink
[CLIENT-2826] Support QueryDuration enum in QueryPolicy
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed Oct 22, 2024
1 parent 9b1ccb4 commit 34edb01
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ All notable changes to this project will be documented in this file.

- **New Features**
- [CLIENT-2833] Support `Policy#ReadTouchTtlPercent`.
- [CLIENT-2826] Support `QueryDuration` in `QueryPolicy#ExpectedDuration`.

- **Fixes**
- [CLIENT-3144] Various fixes. PR #132 and #133 Thanks to [Igor Pstyga](https://github.com/opti)
Expand Down
1 change: 1 addition & 0 deletions lib/aerospike.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
require "aerospike/geo_json"
require "aerospike/ttl"

require "aerospike/policy/query_duration"
require "aerospike/policy/client_policy"
require "aerospike/policy/priority"
require "aerospike/policy/record_exists_action"
Expand Down
2 changes: 1 addition & 1 deletion lib/aerospike/command/batch_index_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def write_buffer
end
end
size_buffer
write_header_read(policy, read_attr | INFO1_BATCH, 0, field_count, 0)
write_header_read(policy, read_attr | INFO1_BATCH, 0, 0, field_count, 0)

write_filter_exp(@policy.filter_exp, exp_size)

Expand Down
23 changes: 15 additions & 8 deletions lib/aerospike/command/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ module Aerospike
INFO2_DURABLE_DELETE = Integer(1 << 4)
# Create only. Fail if record already exists.
INFO2_CREATE_ONLY = Integer(1 << 5)

# Treat as long query, but relax read consistency.
INFO2_RELAX_AP_LONG_QUERY = (1 << 6)
# Return a result for every operation.
INFO2_RESPOND_ALL_OPS = Integer(1 << 7)

Expand Down Expand Up @@ -195,7 +196,7 @@ def set_read_for_key_only(policy, key)
field_count += 1 if exp_size > 0

size_buffer
write_header_read(policy, INFO1_READ | INFO1_GET_ALL, 0, field_count, 0)
write_header_read(policy, INFO1_READ | INFO1_GET_ALL, 0, 0, field_count, 0)
write_key(key)
write_filter_exp(@policy.filter_exp, exp_size)
end_cmd
Expand All @@ -220,7 +221,7 @@ def set_read(policy, key, bin_names)
attr |= INFO1_GET_ALL
end

write_header_read(policy, attr, 0, field_count, bin_names.length)
write_header_read(policy, attr, 0, 0, field_count, bin_names.length)
write_key(key)
write_filter_exp(@policy.filter_exp, exp_size)

Expand Down Expand Up @@ -377,7 +378,7 @@ def set_scan(cluster, policy, namespace, set_name, bin_names, node_partitions)
operation_count = bin_names.length
end

write_header_read(policy, read_attr, info_attr, field_count, operation_count)
write_header_read(policy, read_attr, 0, info_attr, field_count, operation_count)

if namespace
write_field_string(namespace, Aerospike::FieldType::NAMESPACE)
Expand Down Expand Up @@ -591,10 +592,16 @@ def set_query(cluster, policy, statement, background, node_partitions)
write_header_write(policy, INFO2_WRITE, field_count, operation_count)
else
read_attr = INFO1_READ
write_attr = 0

read_attr |= INFO1_NOBINDATA unless policy.include_bin_data
read_attr |= INFO1_SHORT_QUERY if policy.short_query
if policy.short_query || policy.expected_duration == QueryDuration::SHORT
read_attr |= INFO1_SHORT_QUERY
elsif policy.expected_duration == QueryDuration::LONG_RELAX_AP
write_attr |= INFO2_RELAX_AP_LONG_QUERY
end
info_attr = INFO3_PARTITION_DONE if is_new
write_header_read(policy, read_attr, info_attr, field_count, operation_count)
write_header_read(policy, read_attr, write_attr, info_attr, field_count, operation_count)
end


Expand Down Expand Up @@ -960,13 +967,13 @@ def write_header_read_write(policy, args, field_count)
@data_offset = MSG_TOTAL_HEADER_SIZE
end

def write_header_read(policy, read_attr, info_attr, field_count, operation_count)
def write_header_read(policy, read_attr, write_attr, info_attr, field_count, operation_count)
read_attr |= INFO1_COMPRESS_RESPONSE if policy.use_compression
#TODO: Add SC Mode

@data_buffer.write_byte(MSG_REMAINING_HEADER_SIZE, 8) # Message header.length.
@data_buffer.write_byte(read_attr, 9)
@data_buffer.write_byte(0, 10)
@data_buffer.write_byte(write_attr, 10)
@data_buffer.write_byte(info_attr, 11)

(12...18).each { |i| @data_buffer.write_byte(0, i) }
Expand Down
48 changes: 48 additions & 0 deletions lib/aerospike/policy/query_duration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# encoding: utf-8
# Copyright 2014-2024 Aerospike, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Aerospike

# Defines the expected query duration. The server treats the query in different ways depending on the expected duration.
# This enum is ignored for aggregation queries, background queries and server versions < 6.0.
module QueryDuration

# The query is expected to return more than 100 records per node. The server optimizes for a large record set in
# the following ways:
#
# Allow query to be run in multiple threads using the server's query threading configuration.
# Do not relax read consistency for AP namespaces.
# Add the query to the server's query monitor.
# Do not add the overall latency to the server's latency histogram.
# Do not allow server timeouts.
LONG = 0

# The query is expected to return less than 100 records per node. The server optimizes for a small record set in
# the following ways:
# Always run the query in one thread and ignore the server's query threading configuration.
# Allow query to be inlined directly on the server's service thread.
# Relax read consistency for AP namespaces.
# Do not add the query to the server's query monitor.
# Add the overall latency to the server's latency histogram.
# Allow server timeouts. The default server timeout for a short query is 1 second.
SHORT = 1

# Treat query as a LONG query, but relax read consistency for AP namespaces.
# This value is treated exactly like LONG for server versions < 7.1.
LONG_RELAX_AP = 2

end # module

end # module
21 changes: 13 additions & 8 deletions lib/aerospike/policy/query_policy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,18 @@
# License for the specific language governing permissions and limitations under
# the License.

require 'aerospike/policy/query_duration'
require 'aerospike/policy/policy'

module Aerospike

# Container object for query policy command.
class QueryPolicy < Policy

attr_accessor :concurrent_nodes
attr_accessor :max_records
attr_accessor :include_bin_data
attr_accessor :record_queue_size
attr_accessor :records_per_second
attr_accessor :socket_timeout
attr_accessor :short_query
attr_accessor :concurrent_nodes, :max_records, :include_bin_data, :record_queue_size, :records_per_second, :socket_timeout, :short_query, :expected_duration

def initialize(opt={})
super(opt)
super

# Indicates if bin data is retrieved. If false, only record digests (and
# user keys if stored on the server) are retrieved.
Expand Down Expand Up @@ -74,11 +69,21 @@ def initialize(opt={})
# Default is 0
@records_per_second = opt[:records_per_second] || 0

# Expected query duration. The server treats the query in different ways depending on the expected duration.
# This field is ignored for aggregation queries, background queries and server versions < 6.0.
#
# Default: QueryDuration::LONG
@expected_duration = opt[:expected_duration] || QueryDuration::LONG

# DEPRECATED
# Detemine wether query expected to return less than 100 records.
# If true, the server will optimize the query for a small record set.
# This field is ignored for aggregation queries, background queries
# and server versions 6.0+.
#
# This field is deprecated and will eventually be removed. Use {expected_duration} instead.
# For backwards compatibility: If ShortQuery is true, the query is treated as a short query and
# {expected_duration} is ignored. If {short_query} is false, {expected_duration} is used as defaults to {Policy#QueryDuration#LONG}.
# Default: false
@short_query = opt[:short_query] ||false

Expand Down
54 changes: 33 additions & 21 deletions spec/aerospike/query_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
bin_map = {
'bin1' => "value#{i}",
'bin2' => i,
'bin3' => [ i, i + 1_000, i + 1_000_000 ],
'bin4' => { "key#{i}" => i },
'bin3' => [i, i + 1_000, i + 1_000_000],
'bin4' => { "key#{i}" => i }
}
Support.client.put(key, bin_map)
end
Expand Down Expand Up @@ -92,8 +92,20 @@
expect(count).to be > 0
end

it "returns all record bins, expected_duration: QueryDuration::SHORT", skip: !Support.min_version?("7.0.0") do
stmt = Aerospike::Statement.new(@namespace, @set)
stmt.filters << Aerospike::Filter.Equal('bin2', 1)
rs = client.query(stmt, expected_duration: Aerospike::QueryDuration::SHORT)
count = 0
rs.each do |rec|
count += 1
expect(rec.bins.keys).to contain_exactly("bin1", "bin2", "bin3", "bin4")
end
expect(count).to be > 0
end

it "returns only the selected bins" do
bins = ["bin1", "bin2"]
bins = %w[bin1 bin2]
stmt = Aerospike::Statement.new(@namespace, @set, bins)
stmt.filters << Aerospike::Filter.Equal('bin2', 1)
rs = client.query(stmt)
Expand Down Expand Up @@ -152,7 +164,7 @@
rs = client.query(stmt)

records = 0
expect { rs.each { records += 1 } }.not_to raise_error()
expect { rs.each { records += 1 } }.not_to raise_error
expect(records).to eql(0)
end

Expand All @@ -164,13 +176,13 @@
bin_map = {
'bin1' => "value#{i}",
'bin2' => i,
'bin3' => [ i, i + 1_000, i + 1_000_000 ],
'bin4' => { "key#{i}" => i },
'bin3' => [i, i + 1_000, i + 1_000_000],
'bin4' => { "key#{i}" => i }
}
Support.client.put(key, bin_map)
end

stmt = Aerospike::Statement.new(@namespace, set, ['bin1', 'bin2'])
stmt = Aerospike::Statement.new(@namespace, set, %w[bin1 bin2])
rs = client.query(stmt, :records_per_second => (@record_count / 4).to_i)

i = 0
Expand All @@ -193,7 +205,7 @@
context "Numeric Bins" do

it "should return relevant records" do
stmt = Aerospike::Statement.new(@namespace, @set, ['bin1', 'bin2'])
stmt = Aerospike::Statement.new(@namespace, @set, %w[bin1 bin2])
stmt.filters = [Aerospike::Filter.Equal('bin2', 1)]
rs = client.query(stmt)

Expand Down Expand Up @@ -236,7 +248,7 @@
context "List Bins" do

it "should return relevant records" do
stmt = Aerospike::Statement.new(@namespace, @set, ['bin2', 'bin3'])
stmt = Aerospike::Statement.new(@namespace, @set, %w[bin2 bin3])
stmt.filters = [Aerospike::Filter.Contains('bin3', 42, :list)]
rs = client.query(stmt)

Expand Down Expand Up @@ -351,12 +363,12 @@

context "Geospatial Filter", skip: !Support.feature?(Aerospike::Features::GEO) do

let(:lon){ 103.9114 }
let(:lat){ 1.3083 }
let(:radius){ 1000 }
let(:point){ Aerospike::GeoJSON.new({type: "Point", coordinates: [lon, lat]}) }
let(:point2){ Aerospike::GeoJSON.new({type: "Point", coordinates: [lon + 1, lat + 1]}) }
let(:region){ Aerospike::GeoJSON.new({type: "Polygon", coordinates: [[[103.6055, 1.1587], [103.6055, 1.4707], [104.0884, 1.4707], [104.0884, 1.1587], [103.6055, 1.1587]]]}) }
let(:lon) { 103.9114 }
let(:lat) { 1.3083 }
let(:radius) { 1000 }
let(:point) { Aerospike::GeoJSON.new({ type: "Point", coordinates: [lon, lat] }) }
let(:point2) { Aerospike::GeoJSON.new({ type: "Point", coordinates: [lon + 1, lat + 1] }) }
let(:region) { Aerospike::GeoJSON.new({ type: "Polygon", coordinates: [[[103.6055, 1.1587], [103.6055, 1.4707], [104.0884, 1.4707], [104.0884, 1.1587], [103.6055, 1.1587]]] }) }

before(:all) do
tasks = []
Expand Down Expand Up @@ -386,7 +398,7 @@
rs = client.query(stmt)

results = []
rs.each{|record| results << record }
rs.each { |record| results << record }
expect(results.map(&:key)).to eq [key]
end # it

Expand All @@ -399,7 +411,7 @@
rs = client.query(stmt)

results = []
rs.each{|record| results << record }
rs.each { |record| results << record }
expect(results.map(&:key)).to eq [key]
end # it

Expand All @@ -412,7 +424,7 @@
rs = client.query(stmt)

results = []
rs.each{|record| results << record }
rs.each { |record| results << record }
expect(results.map(&:key)).to eq [key]
end # it

Expand All @@ -425,7 +437,7 @@
rs = client.query(stmt)

results = []
rs.each{|record| results << record }
rs.each { |record| results << record }
expect(results.map(&:key)).to eq [key]
end # it

Expand All @@ -438,7 +450,7 @@
rs = client.query(stmt)

results = []
rs.each{|record| results << record }
rs.each { |record| results << record }
expect(results.map(&:key)).to eq [key]
end # it

Expand All @@ -451,7 +463,7 @@
rs = client.query(stmt)

results = []
rs.each{|record| results << record }
rs.each { |record| results << record }
expect(results.map(&:key)).to eq [key]
end # it

Expand Down

0 comments on commit 34edb01

Please sign in to comment.