Skip to content

Commit

Permalink
[CLIENT-1730] Support partition queries
Browse files Browse the repository at this point in the history
[CLIENT-1469] Support query pagination through client#query_partitions with PartitionFilter

[CLIENT-1975] Add support for #max_records and #short_query to QueryPolicy

[CLIENT-1976] Add support for #concurrent_nodes to QueryPolicy
  • Loading branch information
khaf committed Nov 15, 2022
1 parent cad1244 commit 316fdf4
Show file tree
Hide file tree
Showing 15 changed files with 599 additions and 67 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ group :test do
end

group :development do
gem "ruby-lsp", require: false
gem 'rubocop', require: false
gem 'rubocop-performance', require: false
gem 'rubocop-rake', require: false
Expand Down
2 changes: 2 additions & 0 deletions lib/aerospike.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@
require 'aerospike/query/node_partitions'
require 'aerospike/query/scan_executor'
require 'aerospike/query/scan_partition_command'
require 'aerospike/query/query_executor'
require 'aerospike/query/query_partition_command'

require 'aerospike/query/pred_exp/and_or'
require 'aerospike/query/pred_exp/geo_json_value'
Expand Down
50 changes: 25 additions & 25 deletions lib/aerospike/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -693,44 +693,44 @@ def scan_node(node, namespace, set_name, bin_names = nil, options = nil)
# Query functions (Supported by Aerospike 3 servers only)
#--------------------------------------------------------

# Query executes a query and returns a recordset.
# The query executor puts records on a channel from separate goroutines.
# The caller can concurrently pops records off the channel through the
# record channel.
# Executes a query for specified partitions and returns a recordset.
# The query executor puts records on the queue from separate threads.
# The caller can concurrently pop records off the queue through the
# recordset.records API.
#
# This method is only supported by Aerospike 3 servers.
# If the policy is nil, a default policy will be generated.
def query(statement, options = nil)
# This method is only supported by Aerospike 4.9+ servers.
# If the policy is nil, the default relevant policy will be used.
def query_partitions(partition_filter, statement, options = nil)
policy = create_policy(options, QueryPolicy, default_query_policy)
new_policy = policy.clone

nodes = @cluster.nodes
if nodes.empty?
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.")
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Query failed because cluster is empty.")
end

recordset = Recordset.new(policy.record_queue_size, nodes.length, :query)

# Use a thread per node
nodes.each do |node|
partitions = node.cluster.node_partitions(node, statement.namespace)
Thread.new do
Thread.current.abort_on_exception = true
command = QueryCommand.new(node, new_policy, statement, recordset, partitions)
begin
execute_command(command)
rescue => e
Aerospike.logger.error(e.backtrace.join("\n")) unless e == QUERY_TERMINATED_EXCEPTION
recordset.cancel(e)
ensure
recordset.thread_finished
end
end
# result recordset
recordset = Recordset.new(policy.record_queue_size, 1, :query)
tracker = PartitionTracker.new(policy, nodes, partition_filter)
Thread.new do
Thread.current.abort_on_exception = true
QueryExecutor.query_partitions(@cluster, policy, tracker, statement, recordset)
end

recordset
end

# Query executes a query and returns a recordset.
# The query executor puts records on a channel from separate threads.
# The caller can concurrently pops records off the channel through the
# record channel.
#
# This method is only supported by Aerospike 3 servers.
# If the policy is nil, a default policy will be generated.
def query(statement, options = nil)
query_partitions(Aerospike::PartitionFilter.all, statement, options)
end

#-------------------------------------------------------
# User administration
#-------------------------------------------------------
Expand Down
4 changes: 3 additions & 1 deletion lib/aerospike/command/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ module Aerospike
INFO1_READ = Integer(1 << 0)
# Get all bins.
INFO1_GET_ALL = Integer(1 << 1)
# Short query
INFO1_SHORT_QUERY = Integer(1 << 2)


INFO1_BATCH = Integer(1 << 3)
Expand Down Expand Up @@ -454,7 +456,7 @@ def set_scan(policy, namespace, set_name, bin_names, node_partitions)
# @data_buffer.write_byte(policy.scan_percent.to_i.ord, @data_offset)
# @data_offset += 1

write_field_header(4, Aerospike::FieldType::SCAN_TIMEOUT)
write_field_header(4, Aerospike::FieldType::SOCKET_TIMEOUT)
@data_buffer.write_uint32(policy.socket_timeout.to_i, @data_offset)
@data_offset += 4

Expand Down
53 changes: 25 additions & 28 deletions lib/aerospike/command/field_type.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,31 @@ module Aerospike

module FieldType

NAMESPACE = 0
TABLE = 1
KEY = 2
#BIN = 3
DIGEST_RIPE = 4
#GU_TID = 5
DIGEST_RIPE_ARRAY = 6
TRAN_ID = 7 # user supplied transaction id, which is simply passed back
SCAN_OPTIONS = 8
SCAN_TIMEOUT = 9
RECORDS_PER_SECOND = 10
PID_ARRAY = 11
DIGEST_ARRAY = 12
MAX_RECORDS = 13
BVAL_ARRAY = 15
INDEX_NAME = 21
INDEX_RANGE = 22
INDEX_FILTER = 23
INDEX_LIMIT = 24
INDEX_ORDER_BY = 25
INDEX_TYPE = 26
UDF_PACKAGE_NAME = 30
UDF_FUNCTION = 31
UDF_ARGLIST = 32
UDF_OP = 33
QUERY_BINLIST = 40
BATCH_INDEX = 41
PREDEXP = 43
NAMESPACE = 0
TABLE = 1
KEY = 2
DIGEST_RIPE = 4
DIGEST_RIPE_ARRAY = 6
TRAN_ID = 7 # user supplied transaction id, which is simply passed back
SCAN_OPTIONS = 8
SOCKET_TIMEOUT = 9
RECORDS_PER_SECOND = 10
PID_ARRAY = 11
DIGEST_ARRAY = 12
MAX_RECORDS = 13
BVAL_ARRAY = 15
INDEX_NAME = 21
INDEX_RANGE = 22
INDEX_CONTEXT = 23
INDEX_TYPE = 26
UDF_PACKAGE_NAME = 30
UDF_FUNCTION = 31
UDF_ARGLIST = 32
UDF_OP = 33
QUERY_BINLIST = 40
BATCH_INDEX = 41
BATCH_INDEX_WITH_SET = 42
FILTER_EXP = 43

end # module

Expand Down
37 changes: 35 additions & 2 deletions lib/aerospike/policy/query_policy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,45 @@ module Aerospike
# Container object for query policy command.
class QueryPolicy < Policy

attr_accessor :concurrent_nodes
attr_accessor :max_records
attr_accessor :include_bin_data
attr_accessor :record_queue_size
attr_accessor :records_per_second
attr_accessor :socket_timeout
attr_accessor :short_query

def initialize(opt={})
super(opt)

@max_retries = 0

# Indicates if bin data is retrieved. If false, only record digests (and
# user keys if stored on the server) are retrieved.
# Default is true.
@include_bin_data = opt.fetch(:include_bin_data, true)

# Approximates the number of records to return to the client. This number is divided by the
# number of nodes involved in the query. The actual number of records returned
# may be less than MaxRecords if node record counts are small and unbalanced across
# nodes.
#
# This field is supported on server versions >= 4.9.
#
# Default: 0 (do not limit record count)
@max_records = opt.fetch(:max_records) { 0 }

# Issue scan requests in parallel or serially.
@concurrent_nodes = opt.fetch(:concurrent_nodes) { true }

# Determines network timeout for each attempt.
#
# If socket_timeout is not zero and socket_timeout is reached before an attempt completes,
# the Timeout above is checked. If Timeout is not exceeded, the transaction
# is retried. If both socket_timeout and Timeout are non-zero, socket_timeout must be less
# than or equal to Timeout, otherwise Timeout will also be used for socket_timeout.
#
# Default: 30s
@socket_timeout = opt[:socket_timeout] || 30000

# Number of records to place in queue before blocking. Records received
# from multiple server nodes will be placed in a queue. A separate thread
# consumes these records in parallel. If the queue is full, the producer
Expand All @@ -49,6 +74,14 @@ def initialize(opt={})
# Default is 0
@records_per_second = opt[:records_per_second] || 0

# Detemine wether query expected to return less than 100 records.
# If true, the server will optimize the query for a small record set.
# This field is ignored for aggregation queries, background queries
# and server versions 6.0+.
#
# Default: false
@short_query = opt[:short_query] ||false

self
end

Expand Down
2 changes: 0 additions & 2 deletions lib/aerospike/policy/scan_policy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ class ScanPolicy < Policy
def initialize(opt={})
super(opt)

@max_retries = 0

# Approximates the number of records to return to the client. This number is divided by the
# number of nodes involved in the query. The actual number of records returned
# may be less than MaxRecords if node record counts are small and unbalanced across
Expand Down
2 changes: 1 addition & 1 deletion lib/aerospike/query/query_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def write_buffer
@data_offset += binNameSize
fieldCount+=1
end
else
else
@data_offset += @partitions.length * 2 + FIELD_HEADER_SIZE
fieldCount += 1

Expand Down
73 changes: 73 additions & 0 deletions lib/aerospike/query/query_executor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# frozen_string_literal: true

# Copyright 2014-2020 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

module Aerospike
class QueryExecutor # :nodoc:

def self.query_partitions(cluster, policy, tracker, statement, recordset)
interval = policy.sleep_between_retries

should_retry = false

loop do
list = tracker.assign_partitions_to_nodes(cluster, statement.namespace)

if policy.concurrent_nodes
threads = []
# Use a thread per node
list.each do |node_partition|

threads << Thread.new do
Thread.current.abort_on_exception = true
command = QueryPartitionCommand.new(node_partition.node, tracker, policy, statement, recordset, node_partition)
begin
command.execute
rescue => e
should_retry ||= command.should_retry(e)
# puts "should retry: #{should_retry}"
Aerospike.logger.error(e.backtrace.join("\n")) unless e == QUERY_TERMINATED_EXCEPTION
end
end
end
threads.each(&:join)
else
# Use a single thread for all nodes for all node
list.each do |node_partition|
command = QueryPartitionCommand.new(node_partition.node, tracker, policy, statement, recordset, node_partition)
begin
command.execute
rescue => e
should_retry ||= command.should_retry(e)
Aerospike.logger.error(e.backtrace.join("\n")) unless e == QUERY_TERMINATED_EXCEPTION
end
end
end

complete = tracker.complete?(@cluster, policy)

if complete || !should_retry
recordset.thread_finished
return
end
sleep(interval) if policy.sleep_between_retries > 0
statement.reset_task_id
end
end

end

end
Loading

0 comments on commit 316fdf4

Please sign in to comment.