Skip to content

Commit

Permalink
CLIENT-2817 Support read_touch_ttl_percent in as_policy_read, as_poli…
Browse files Browse the repository at this point in the history
…cy_operate, as_policy_batch and as_policy_batch_read.

Requires server version 7.1+.
  • Loading branch information
BrianNichols committed Mar 20, 2024
1 parent 65db7fc commit a7d1d02
Show file tree
Hide file tree
Showing 10 changed files with 343 additions and 53 deletions.
7 changes: 4 additions & 3 deletions src/include/aerospike/as_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ as_command_write_header_write(
uint8_t*
as_command_write_header_read(
uint8_t* cmd, const as_policy_base* policy, as_policy_read_mode_ap read_mode_ap,
as_policy_read_mode_sc read_mode_sc, uint32_t timeout, uint16_t n_fields, uint16_t n_bins,
uint8_t read_attr, uint8_t write_attr, uint8_t info_attr
as_policy_read_mode_sc read_mode_sc, int read_ttl, uint32_t timeout, uint16_t n_fields,
uint16_t n_bins, uint8_t read_attr, uint8_t write_attr, uint8_t info_attr
);

/**
Expand All @@ -377,7 +377,8 @@ as_command_write_header_read(
uint8_t*
as_command_write_header_read_header(
uint8_t* cmd, const as_policy_base* policy, as_policy_read_mode_ap read_mode_ap,
as_policy_read_mode_sc read_mode_sc, uint16_t n_fields, uint16_t n_bins, uint8_t read_attr
as_policy_read_mode_sc read_mode_sc, int read_ttl, uint16_t n_fields, uint16_t n_bins,
uint8_t read_attr
);

/**
Expand Down
93 changes: 89 additions & 4 deletions src/include/aerospike/as_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,26 @@ typedef struct as_policy_read_s {
*/
as_policy_read_mode_sc read_mode_sc;

/**
* Determine how record TTL (time to live) is affected on reads. When enabled, the server can
* efficiently operate as a read-based LRU cache where the least recently used records are expired.
* The value is expressed as a percentage of the TTL sent on the most recent write.
*
* For example, if the most recent write had a TTL of 10 hours and read_touch_ttl_percent is set to
* 80, the next read after 8 hours will result in a touch, resetting the TTL to another 10 hours.
*
* Values:
* <ul>
* <li> 0 : Use server config default-read-touch-ttl-pct for the record's namespace/set.</li>
* <li>-1 : Do not reset record TTL on reads.</li>
* <li>1 - 100 : Reset record TTL on reads when within this percentage of the most recent write TTL.</li>
* </ul>
* <li>
*
* Default: 0
*/
int read_touch_ttl_percent;

/**
* Should raw bytes representing a list or map be deserialized to as_list or as_map.
* Set to false for backup programs that just need access to raw bytes.
Expand Down Expand Up @@ -768,8 +788,9 @@ typedef struct as_policy_operate_s {

/**
* The default time-to-live (expiration) of the record in seconds. This field will
* only be used if "as_operations.ttl" is set to AS_RECORD_CLIENT_DEFAULT_TTL. The
* as_operations instance is passed in to operate functions along with as_policy_operate.
* only be used if one or more of the operations is a write operation and if "as_operations.ttl"
* is set to AS_RECORD_CLIENT_DEFAULT_TTL. The as_operations instance is passed in to
* operate functions along with as_policy_operate.
*
* There are also special values that can be set in the record ttl:
* <ul>
Expand All @@ -780,6 +801,26 @@ typedef struct as_policy_operate_s {
*/
uint32_t ttl;

/**
* Determine how record TTL (time to live) is affected on reads. When enabled, the server can
* efficiently operate as a read-based LRU cache where the least recently used records are expired.
* The value is expressed as a percentage of the TTL sent on the most recent write.
*
* For example, if the most recent write had a TTL of 10 hours and read_touch_ttl_percent is set to
* 80, the next read after 8 hours will result in a touch, resetting the TTL to another 10 hours.
*
* Values:
* <ul>
* <li> 0 : Use server config default-read-touch-ttl-pct for the record's namespace/set.</li>
* <li>-1 : Do not reset record TTL on reads.</li>
* <li>1 - 100 : Reset record TTL on reads when within this percentage of the most recent write TTL.</li>
* </ul>
* <li>
*
* Default: 0
*/
int read_touch_ttl_percent;

/**
* Should raw bytes representing a list or map be deserialized to as_list or as_map.
* Set to false for backup programs that just need access to raw bytes.
Expand Down Expand Up @@ -887,6 +928,26 @@ typedef struct as_policy_batch_s {
*/
as_policy_read_mode_sc read_mode_sc;

/**
* Determine how record TTL (time to live) is affected on reads. When enabled, the server can
* efficiently operate as a read-based LRU cache where the least recently used records are expired.
* The value is expressed as a percentage of the TTL sent on the most recent write.
*
* For example, if the most recent write had a TTL of 10 hours and read_touch_ttl_percent is set to
* 80, the next read after 8 hours will result in a touch, resetting the TTL to another 10 hours.
*
* Values:
* <ul>
* <li> 0 : Use server config default-read-touch-ttl-pct for the record's namespace/set.</li>
* <li>-1 : Do not reset record TTL on reads.</li>
* <li>1 - 100 : Reset record TTL on reads when within this percentage of the most recent write TTL.</li>
* </ul>
* <li>
*
* Default: 0
*/
int read_touch_ttl_percent;

/**
* Determine if batch commands to each server are run in parallel threads.
*
Expand Down Expand Up @@ -987,7 +1048,7 @@ typedef struct as_policy_batch_read_s {
* transaction is ignored. This can be used to eliminate a client/server roundtrip
* in some cases.
*
* aerospike_destroy() automatically calls as_exp_destroy() on all global default
* aerospike_destroy() automatically calls as_exp_destroy() on all global default
* policy filter expression instances. The user is responsible for calling as_exp_destroy()
* on filter expressions when setting temporary transaction policies.
*
Expand All @@ -1007,6 +1068,26 @@ typedef struct as_policy_batch_read_s {
*/
as_policy_read_mode_sc read_mode_sc;

/**
* Determine how record TTL (time to live) is affected on reads. When enabled, the server can
* efficiently operate as a read-based LRU cache where the least recently used records are expired.
* The value is expressed as a percentage of the TTL sent on the most recent write.
*
* For example, if the most recent write had a TTL of 10 hours and read_touch_ttl_percent is set to
* 80, the next read after 8 hours will result in a touch, resetting the TTL to another 10 hours.
*
* Values:
* <ul>
* <li> 0 : Use server config default-read-touch-ttl-pct for the record's namespace/set.</li>
* <li>-1 : Do not reset record TTL on reads.</li>
* <li>1 - 100 : Reset record TTL on reads when within this percentage of the most recent write TTL.</li>
* </ul>
* <li>
*
* Default: 0
*/
int read_touch_ttl_percent;

} as_policy_batch_read;

/**
Expand All @@ -1019,7 +1100,7 @@ typedef struct as_policy_batch_write_s {
* transaction is ignored. This can be used to eliminate a client/server roundtrip
* in some cases.
*
* aerospike_destroy() automatically calls as_exp_destroy() on all global default
* aerospike_destroy() automatically calls as_exp_destroy() on all global default
* policy filter expression instances. The user is responsible for calling as_exp_destroy()
* on filter expressions when setting temporary transaction policies.
*
Expand Down Expand Up @@ -1498,6 +1579,7 @@ as_policy_read_init(as_policy_read* p)
p->replica = AS_POLICY_REPLICA_DEFAULT;
p->read_mode_ap = AS_POLICY_READ_MODE_AP_DEFAULT;
p->read_mode_sc = AS_POLICY_READ_MODE_SC_DEFAULT;
p->read_touch_ttl_percent = 0;
p->deserialize = true;
p->async_heap_rec = false;
return p;
Expand Down Expand Up @@ -1574,6 +1656,7 @@ as_policy_operate_init(as_policy_operate* p)
p->gen = AS_POLICY_GEN_DEFAULT;
p->exists = AS_POLICY_EXISTS_DEFAULT;
p->ttl = 0; // AS_RECORD_DEFAULT_TTL
p->read_touch_ttl_percent = 0;
p->deserialize = true;
p->durable_delete = false;
p->async_heap_rec = false;
Expand Down Expand Up @@ -1678,6 +1761,7 @@ as_policy_batch_init(as_policy_batch* p)
p->replica = AS_POLICY_REPLICA_SEQUENCE;
p->read_mode_ap = AS_POLICY_READ_MODE_AP_DEFAULT;
p->read_mode_sc = AS_POLICY_READ_MODE_SC_DEFAULT;
p->read_touch_ttl_percent = 0;
p->concurrent = false;
p->allow_inline = true;
p->allow_inline_ssd = false;
Expand Down Expand Up @@ -1727,6 +1811,7 @@ as_policy_batch_read_init(as_policy_batch_read* p)
p->filter_exp = NULL;
p->read_mode_ap = AS_POLICY_READ_MODE_AP_DEFAULT;
p->read_mode_sc = AS_POLICY_READ_MODE_SC_DEFAULT;
p->read_touch_ttl_percent = 0;
return p;
}

Expand Down
69 changes: 40 additions & 29 deletions src/main/aerospike/aerospike_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@
// Constants
//---------------------------------

#define BATCH_MSG_READ 0x0
#define BATCH_MSG_REPEAT 0x1
#define BATCH_MSG_INFO 0x2
#define BATCH_MSG_WRITE 0xe
#define BATCH_MSG_READ 0x0
#define BATCH_MSG_REPEAT 0x1
#define BATCH_MSG_INFO 0x2
#define BATCH_MSG_GEN 0x4
#define BATCH_MSG_TTL 0x8

#define BATCH_TYPE_RECORDS 0
#define BATCH_TYPE_KEYS 1
#define BATCH_TYPE_KEYS_NO_CALLBACK 2
#define BATCH_TYPE_RECORDS 0
#define BATCH_TYPE_KEYS 1
#define BATCH_TYPE_KEYS_NO_CALLBACK 2

//---------------------------------
// Types
Expand Down Expand Up @@ -537,7 +538,7 @@ as_batch_read_record_size_old(
as_key* key, as_batch_read_record* rec, as_batch_builder* bb, as_error* err
)
{
bb->size += 6;
bb->size += 6; // repeat(1) + info1(1) + n_fields(2) + n_ops(2) = 6
bb->size += as_command_string_field_size(key->ns);
bb->size += as_command_string_field_size(key->set);

Expand Down Expand Up @@ -640,8 +641,8 @@ as_batch_header_write_old(
}

p = as_command_write_header_read(p, &policy->base, policy->read_mode_ap,
policy->read_mode_sc, policy->base.total_timeout, bb->field_count_header, 0,
bb->read_attr | AS_MSG_INFO1_BATCH_INDEX, 0, 0);
policy->read_mode_sc, policy->read_touch_ttl_percent, policy->base.total_timeout,
bb->field_count_header, 0, bb->read_attr | AS_MSG_INFO1_BATCH_INDEX, 0, 0);

if (bb->filter_exp) {
p = as_exp_write(bb->filter_exp, p);
Expand Down Expand Up @@ -889,7 +890,7 @@ as_batch_write_record_size(
as_key* key, as_batch_write_record* rec, as_batch_builder* bb, as_error* err
)
{
bb->size += 6; // gen(2) + ttl(4) = 6
bb->size += 2; // gen(2) = 2

if (rec->policy) {
if (rec->policy->filter_exp) {
Expand Down Expand Up @@ -930,7 +931,7 @@ as_batch_write_record_size(
static void
as_batch_apply_record_size(as_key* key, as_batch_apply_record* rec, as_batch_builder* bb)
{
bb->size += 6; // gen(2) + ttl(4) = 6
bb->size += 2; // gen(2) = 2

if (rec->policy) {
if (rec->policy->filter_exp) {
Expand All @@ -957,7 +958,7 @@ as_batch_apply_record_size(as_key* key, as_batch_apply_record* rec, as_batch_bui
static void
as_batch_remove_record_size(as_key* key, as_batch_remove_record* rec, as_batch_builder* bb)
{
bb->size += 6; // gen(2) + ttl(4) = 6
bb->size += 2; // gen(2) = 2

if (rec->policy) {
if (rec->policy->filter_exp) {
Expand All @@ -975,7 +976,7 @@ as_batch_record_size(
as_key* key, as_batch_base_record* rec, as_batch_builder* bb, as_error* err
)
{
bb->size += 8;
bb->size += 12; // repeat(1) + info(3) + ttl(4) + n_fields(2) + n_ops(2) = 12
bb->size += as_command_string_field_size(key->ns);
bb->size += as_command_string_field_size(key->set);

Expand Down Expand Up @@ -1081,7 +1082,7 @@ as_batch_attr_read_header(as_batch_attr* attr, const as_policy_batch* p)
attr->info_attr = AS_MSG_INFO3_SC_READ_TYPE | AS_MSG_INFO3_SC_READ_RELAX;
break;
}
attr->ttl = 0;
attr->ttl = p->read_touch_ttl_percent;
attr->gen = 0;
attr->has_write = false;
attr->send_key = false;
Expand Down Expand Up @@ -1114,7 +1115,7 @@ as_batch_attr_read_row(as_batch_attr* attr, const as_policy_batch_read* p)
attr->info_attr = AS_MSG_INFO3_SC_READ_TYPE | AS_MSG_INFO3_SC_READ_RELAX;
break;
}
attr->ttl = 0;
attr->ttl = p->read_touch_ttl_percent;
attr->gen = 0;
attr->has_write = false;
attr->send_key = false;
Expand Down Expand Up @@ -1322,10 +1323,12 @@ as_batch_write_read(
uint8_t* p, as_key* key, as_batch_attr* attr, as_exp* filter, uint16_t n_ops
)
{
*p++ = BATCH_MSG_INFO;
*p++ = (BATCH_MSG_INFO | BATCH_MSG_TTL);
*p++ = attr->read_attr;
*p++ = attr->write_attr;
*p++ = attr->info_attr;
*(uint32_t*)p = cf_swap_to_be32(attr->ttl);
p += sizeof(uint32_t);
p = as_batch_write_fields_filter(p, key, filter, 0, n_ops);
return p;
}
Expand All @@ -1335,7 +1338,7 @@ as_batch_write_write(
uint8_t* p, as_key* key, as_batch_attr* attr, as_exp* filter, uint16_t n_fields, uint16_t n_ops
)
{
*p++ = BATCH_MSG_WRITE;
*p++ = (BATCH_MSG_INFO | BATCH_MSG_GEN | BATCH_MSG_TTL);
*p++ = attr->read_attr;
*p++ = attr->write_attr;
*p++ = attr->info_attr;
Expand Down Expand Up @@ -3065,20 +3068,28 @@ as_batch_retry_parse_row(uint8_t* p, uint8_t* type)
{
p += sizeof(uint32_t) + AS_DIGEST_VALUE_SIZE;

*type = *p++;
uint8_t t = *p++;
*type = t;

switch (*type) {
case BATCH_MSG_REPEAT:
if (t == BATCH_MSG_REPEAT) {
return p;
case BATCH_MSG_READ:
}

if (t == BATCH_MSG_READ) {
p++;
break;
case BATCH_MSG_INFO:
p += 3;
break;
case BATCH_MSG_WRITE:
p += 9;
break;
}
else {
if (t & BATCH_MSG_INFO) {
p += 3;
}

if (t & BATCH_MSG_GEN) {
p += 2;
}

if (t & BATCH_MSG_TTL) {
p += 4;
}
}

uint16_t n_fields = cf_swap_from_be16(*(uint16_t*)p);
Expand Down
Loading

0 comments on commit a7d1d02

Please sign in to comment.