Skip to content

Commit

Permalink
Initial pass at metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
shannonklaus committed Jan 19, 2024
1 parent 662f8d1 commit cf31bad
Show file tree
Hide file tree
Showing 20 changed files with 1,205 additions and 30 deletions.
19 changes: 18 additions & 1 deletion src/include/aerospike/aerospike_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ typedef struct as_node_stats_s {
/**
* Node error count within current window.
*/
uint32_t error_count;
uint32_t error_rate_count;

} as_node_stats;

Expand Down Expand Up @@ -144,6 +144,11 @@ typedef struct as_cluster_stats_s {
*/
uint32_t thread_pool_queued_tasks;

/**
* Count of transaction retries since cluster was started.
*/
uint64_t retry_count;

} as_cluster_stats;

struct as_cluster_s;
Expand Down Expand Up @@ -242,6 +247,18 @@ aerospike_event_loop_stats(as_event_loop* event_loop, as_event_loop_stats* stats
AS_EXTERN char*
aerospike_stats_to_string(as_cluster_stats* stats);

/**
* Enable extended periodic cluster and node latency metrics.
*/
AS_EXTERN void
aerospike_enable_metrics(aerospike* as, const struct as_policy_metrics_s* policy);

/**
* Disable extended periodic cluster and node latency metrics.
*/
AS_EXTERN void
aerospike_disable_metrics(aerospike* as);

#ifdef __cplusplus
} // end extern "C"
#endif
46 changes: 36 additions & 10 deletions src/include/aerospike/as_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <aerospike/as_atomic.h>
#include <aerospike/as_config.h>
#include <aerospike/as_metrics.h>
#include <aerospike/as_node.h>
#include <aerospike/as_partition.h>
#include <aerospike/as_policy.h>
Expand Down Expand Up @@ -379,6 +380,19 @@ typedef struct as_cluster_s {
* Should continue to tend cluster.
*/
volatile bool valid;

bool metrics_enabled;

as_policy_metrics* metrics_policy;

as_metrics_callbacks* metrics_callbacks;

uint64_t retry_count;

uint64_t tran_count;

uint64_t delay_queue_timeout_count;

} as_cluster;

/******************************************************************************
Expand Down Expand Up @@ -518,6 +532,18 @@ as_partition_shm_get_node(
as_node* prev_node, as_policy_replica replica, uint8_t replica_size, uint8_t* replica_index
);

void
as_cluster_enable_metrics(as_cluster* cluster, as_policy_metrics* policy);

void
as_cluster_disable_metrics(as_cluster* cluster);

void
as_cluster_add_tran(as_cluster* cluster);

uint64_t
as_cluster_get_tran_count(const as_cluster* cluster);

/**
* @private
* Get mapped node given partition and replica. This function does not reserve the node.
Expand All @@ -544,10 +570,10 @@ as_partition_get_node(
* Increment node's error count.
*/
static inline void
as_node_incr_error_count(as_node* node)
as_node_incr_error_rate(as_node* node)
{
if (node->cluster->max_error_rate > 0) {
as_incr_uint32(&node->error_count);
as_incr_uint32(&node->error_rate_count);
}
}

Expand All @@ -556,30 +582,30 @@ as_node_incr_error_count(as_node* node)
* Reset node's error count.
*/
static inline void
as_node_reset_error_count(as_node* node)
as_node_reset_error_rate_count(as_node* node)
{
as_store_uint32(&node->error_count, 0);
as_store_uint32(&node->error_rate_count, 0);
}

/**
* @private
* Get node's error count.
*/
static inline uint32_t
as_node_get_error_count(as_node* node)
as_node_get_error_rate(as_node* node)
{
return as_load_uint32(&node->error_count);
return as_load_uint32(&node->error_rate_count);
}

/**
* @private
* Validate node's error count.
*/
static inline bool
as_node_valid_error_count(as_node* node)
as_node_valid_error_rate(as_node* node)
{
uint32_t max = node->cluster->max_error_rate;
return max == 0 || max >= as_load_uint32(&node->error_count);
return max == 0 || max >= as_load_uint32(&node->error_rate_count);
}

/**
Expand All @@ -590,7 +616,7 @@ static inline void
as_node_close_conn_error(as_node* node, as_socket* sock, as_conn_pool* pool)
{
as_node_close_connection(node, sock, pool);
as_node_incr_error_count(node);
as_node_incr_error_rate(node);
}

/**
Expand All @@ -601,7 +627,7 @@ static inline void
as_node_put_conn_error(as_node* node, as_socket* sock)
{
as_node_put_connection(node, sock);
as_node_incr_error_count(node);
as_node_incr_error_rate(node);
}

#ifdef __cplusplus
Expand Down
1 change: 1 addition & 0 deletions src/include/aerospike/as_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ typedef struct as_command_s {
uint8_t replica_size;
uint8_t replica_index;
uint8_t replica_index_sc; // Used in batch only.
as_latency_type latency_type;
} as_command;

/**
Expand Down
4 changes: 2 additions & 2 deletions src/include/aerospike/as_event_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ as_event_release_async_connection(as_event_command* cmd)
{
as_async_conn_pool* pool = &cmd->node->async_conn_pools[cmd->event_loop->index];
as_event_release_connection(cmd->conn, pool);
as_node_incr_error_count(cmd->node);
as_node_incr_error_rate(cmd->node);
}

static inline void
Expand All @@ -783,7 +783,7 @@ as_event_connection_timeout(as_event_command* cmd, as_async_conn_pool* pool)
if (conn->watching > 0) {
as_event_stop_watcher(cmd, conn);
as_event_release_connection(conn, pool);
as_node_incr_error_count(cmd->node);
as_node_incr_error_rate(cmd->node);
}
else {
cf_free(conn);
Expand Down
189 changes: 189 additions & 0 deletions src/include/aerospike/as_metrics.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Copyright 2008-2023 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

#pragma once

#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <aerospike/as_string_builder.h>

#if !defined(_MSC_VER)
#include <netinet/in.h>
#include <sys/uio.h>
#endif

#ifdef __cplusplus
extern "C" {
#endif

/******************************************************************************
* MACROS
*****************************************************************************/

#define NS_TO_MS 1000000
#define MIN_FILE_SIZE 1000000
#define UTC_STR_LEN 72

/******************************************************************************
* TYPES
*****************************************************************************/

typedef enum as_latency_type_e {
AS_LATENCY_TYPE_CONN, //as_queue or as_async_conn_pool?
AS_LATENCY_TYPE_WRITE,
AS_LATENCY_TYPE_READ,
AS_LATENCY_TYPE_BATCH,
AS_LATENCY_TYPE_QUERY,
AS_LATENCY_TYPE_NONE
} as_latency_type;

typedef struct as_latency_buckets_s {
int32_t latency_shift;

int32_t latency_columns;

uint64_t* buckets;
} as_latency_buckets;

struct as_metrics_callbacks_s;

/**
* Metrics Policy
*/
typedef struct as_policy_metrics_s {
const char* report_directory;

int64_t report_size_limit; // default 0

int32_t interval; // default 30

int32_t latency_columns; // default 7

int32_t latency_shift; // default 1

struct as_metrics_callbacks_s* metrics_callbacks;

FILE* file;
} as_policy_metrics;

struct as_cluster_s;
struct as_node_s;

typedef void (*as_metrics_enable_callback)(struct as_policy_metrics_s* policy);

typedef void (*as_metrics_snapshot_callback)(const struct as_policy_metrics_s* policy, const struct as_cluster_s* cluster);

typedef void (*as_metrics_node_close_callback)(const struct as_policy_metrics_s* policy, const struct as_node_s* node);

typedef void (*as_metrics_disable_callback)(struct as_policy_metrics_s* policy, const struct as_cluster_s* cluster);

typedef struct as_metrics_callbacks_s {
as_metrics_enable_callback enable_callback;
as_metrics_snapshot_callback snapshot_callback;
as_metrics_node_close_callback node_close_callback;
as_metrics_disable_callback disable_callback;
} as_metrics_callbacks;

typedef struct as_node_metrics_s {
as_latency_buckets* latency;
} as_node_metrics;

const char*
utc_time_str(time_t t);

void
as_metrics_policy_init(as_policy_metrics* policy);

char*
as_latency_type_to_string(as_latency_type type);

void
as_metrics_latency_buckets_init(as_latency_buckets* latency_buckets, int32_t latencyColumns, int32_t latencyShift);

uint64_t
as_metrics_get_bucket(as_latency_buckets* buckets, uint32_t i);

void
as_metrics_latency_buckets_add(as_latency_buckets* latency_buckets, uint64_t elapsed);

uint32_t
as_metrics_get_index(as_latency_buckets* latency_buckets, uint64_t elapsed_nanos);

void
as_node_metrics_init(as_node_metrics* node_metrics, const as_policy_metrics* policy);

void
as_metrics_add_latency(as_node_metrics* node_metrics, as_latency_type latency_type, uint64_t elapsed);

void
as_metrics_callbacks_init(as_metrics_callbacks* callbacks);

void
as_metrics_process_cpu_load_mem_usage(double* cpu_usage, double* mem);

void
as_metrics_write_cluster(struct as_policy_metrics_s* policy, const struct as_cluster_s* cluster);

void
as_metrics_write_node(as_string_builder* sb, struct as_node_stats_s* node_stats);

void
as_metrics_write_conn(as_string_builder* sb, struct as_conn_stats_s* conn_stats);

#if defined(__linux__)
void
as_metrics_process_cpu_load_mem_usage(double* cpu_usage, double* mem);

void
as_metrics_proc_stat_mem_cpu(double* vm_usage, double* resident_set, double* cpu_usage);
#endif

#if defined(_MSC_VER)

void
as_metrics_process_cpu_load_mem_usage(double* cpu_usage, double* mem);

static double
as_metrics_calculate_cpu_load(uint64_t idleTicks, uint64_t totalTicks);

static uint64_t
as_metrics_file_time_to_uint_64(const FILETIME ft);

double
as_metrics_process_cpu_load();

double
as_metrics_process_mem_usage();

#endif

#if defined(__APPLE__)

void
as_metrics_process_cpu_load_mem_usage(double* cpu_usage, double* mem);

double
as_metrics_process_mem_usage();

double
as_metrics_process_cpu_load();
#endif

#ifdef __cplusplus
} // end extern "C"
#endif
Loading

0 comments on commit cf31bad

Please sign in to comment.