Skip to content

Commit

Permalink
Change enum to macros for latency_type, add comments to cluster, and …
Browse files Browse the repository at this point in the history
…rename error_rate_count to error_rate in node
  • Loading branch information
shannonklaus committed Jan 22, 2024
1 parent dc50d0f commit 65a6f7e
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 39 deletions.
28 changes: 22 additions & 6 deletions src/include/aerospike/as_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ typedef struct as_cluster_s {

as_policy_metrics* metrics_policy;

as_metrics_callbacks* metrics_callbacks;
as_metrics_listeners* metrics_listeners;

uint64_t retry_count;

Expand Down Expand Up @@ -532,15 +532,31 @@ as_partition_shm_get_node(
as_node* prev_node, as_policy_replica replica, uint8_t replica_size, uint8_t* replica_index
);

/**
* @private
* Enable the collection of metrics
*/
void
as_cluster_enable_metrics(as_cluster* cluster, as_policy_metrics* policy);

/**
* @private
* Disable the collection of metrics
*/
void
as_cluster_disable_metrics(as_cluster* cluster);

/**
* @private
* Increment transaction count when metrics are enabled.
*/
void
as_cluster_add_tran(as_cluster* cluster);

/**
* @private
* Return transaction count. The value is cumulative and not reset per metrics interval.
*/
uint64_t
as_cluster_get_tran_count(const as_cluster* cluster);

Expand Down Expand Up @@ -573,7 +589,7 @@ static inline void
as_node_incr_error_rate(as_node* node)
{
if (node->cluster->max_error_rate > 0) {
as_incr_uint32(&node->error_rate_count);
as_incr_uint32(&node->error_rate);
}
}

Expand All @@ -582,9 +598,9 @@ as_node_incr_error_rate(as_node* node)
* Reset node's error count.
*/
static inline void
as_node_reset_error_rate_count(as_node* node)
as_node_reset_error_rate(as_node* node)
{
as_store_uint32(&node->error_rate_count, 0);
as_store_uint32(&node->error_rate, 0);
}

/**
Expand All @@ -594,7 +610,7 @@ as_node_reset_error_rate_count(as_node* node)
static inline uint32_t
as_node_get_error_rate(as_node* node)
{
return as_load_uint32(&node->error_rate_count);
return as_load_uint32(&node->error_rate);
}

/**
Expand All @@ -605,7 +621,7 @@ static inline bool
as_node_valid_error_rate(as_node* node)
{
uint32_t max = node->cluster->max_error_rate;
return max == 0 || max >= as_load_uint32(&node->error_rate_count);
return max == 0 || max >= as_load_uint32(&node->error_rate);
}

/**
Expand Down
28 changes: 14 additions & 14 deletions src/include/aerospike/as_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@ extern "C" {
#define MIN_FILE_SIZE 1000000
#define UTC_STR_LEN 72

typedef uint8_t as_latency_type;

#define AS_LATENCY_TYPE_CONN 0
#define AS_LATENCY_TYPE_WRITE 1
#define AS_LATENCY_TYPE_READ 2
#define AS_LATENCY_TYPE_BATCH 3
#define AS_LATENCY_TYPE_QUERY 4
#define AS_LATENCY_TYPE_NONE 5

/******************************************************************************
* TYPES
*****************************************************************************/

typedef enum as_latency_type_e {
AS_LATENCY_TYPE_CONN, //as_queue or as_async_conn_pool?
AS_LATENCY_TYPE_WRITE,
AS_LATENCY_TYPE_READ,
AS_LATENCY_TYPE_BATCH,
AS_LATENCY_TYPE_QUERY,
AS_LATENCY_TYPE_NONE
} as_latency_type;

typedef struct as_latency_buckets_s {
int32_t latency_shift;

Expand All @@ -60,7 +60,7 @@ typedef struct as_latency_buckets_s {
uint64_t* buckets;
} as_latency_buckets;

struct as_metrics_callbacks_s;
struct as_metrics_listeners_s;

/**
* Metrics Policy
Expand All @@ -76,7 +76,7 @@ typedef struct as_policy_metrics_s {

int32_t latency_shift; // default 1

struct as_metrics_callbacks_s* metrics_callbacks;
struct as_metrics_listeners_s* metrics_listeners;

FILE* file;
} as_policy_metrics;
Expand All @@ -92,12 +92,12 @@ typedef void (*as_metrics_node_close_callback)(const struct as_policy_metrics_s*

typedef void (*as_metrics_disable_callback)(struct as_policy_metrics_s* policy, const struct as_cluster_s* cluster);

typedef struct as_metrics_callbacks_s {
typedef struct as_metrics_listeners_s {
as_metrics_enable_callback enable_callback;
as_metrics_snapshot_callback snapshot_callback;
as_metrics_node_close_callback node_close_callback;
as_metrics_disable_callback disable_callback;
} as_metrics_callbacks;
} as_metrics_listeners;

typedef struct as_node_metrics_s {
as_latency_buckets* latency;
Expand Down Expand Up @@ -131,7 +131,7 @@ void
as_metrics_add_latency(as_node_metrics* node_metrics, as_latency_type latency_type, uint64_t elapsed);

void
as_metrics_callbacks_init(as_metrics_callbacks* callbacks);
as_metrics_listeners_init(as_metrics_listeners* listeners);

void
as_metrics_process_cpu_load_mem_usage(double* cpu_usage, double* mem);
Expand Down
12 changes: 9 additions & 3 deletions src/include/aerospike/as_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,18 @@ typedef struct as_node_s {
/**
* Error count for this node's error_rate_window.
*/
uint32_t error_rate_count;
uint32_t error_rate;

/**
* Transaction error count since node was initialized. If the error is retryable, multiple errors per
* transaction may occur.
*/
uint64_t error_count;

/**
* Transaction timeout count since node was initialized. If the timeout is retryable (ie socketTimeout),
* multiple timeouts per transaction may occur.
*/
uint64_t timeout_count;

/**
Expand Down Expand Up @@ -644,8 +652,6 @@ as_node_signal_login(as_node* node);
bool
as_node_has_rack(as_node* node, const char* ns, int rack_id);

typedef enum as_latency_type_e as_latency_type;

void
as_node_add_latency(as_node* node, as_latency_type latency_type, uint64_t elapsed);

Expand Down
18 changes: 9 additions & 9 deletions src/main/aerospike/as_cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -559,13 +559,13 @@ as_cluster_enable_metrics(as_cluster* cluster, as_policy_metrics* policy)
{
if (cluster->metrics_enabled)
{
cluster->metrics_callbacks->disable_callback(policy, cluster);
cluster->metrics_listeners->disable_callback(policy, cluster);
}

cluster->metrics_callbacks = policy->metrics_callbacks;
if (cluster->metrics_callbacks == NULL)
cluster->metrics_listeners = policy->metrics_listeners;
if (cluster->metrics_listeners == NULL)
{
as_metrics_callbacks_init(cluster->metrics_callbacks);
as_metrics_listeners_init(cluster->metrics_listeners);
}

cluster->metrics_policy = policy;
Expand All @@ -576,7 +576,7 @@ as_cluster_enable_metrics(as_cluster* cluster, as_policy_metrics* policy)
as_node_enable_metrics(node, policy);
}

cluster->metrics_callbacks->enable_callback(policy);
cluster->metrics_listeners->enable_callback(policy);
}

void
Expand All @@ -585,7 +585,7 @@ as_cluster_disable_metrics(as_cluster* cluster)
if (cluster->metrics_enabled)
{
cluster->metrics_enabled = false;
cluster->metrics_callbacks->disable_callback(cluster->metrics_policy, cluster);
cluster->metrics_listeners->disable_callback(cluster->metrics_policy, cluster);
}
}

Expand Down Expand Up @@ -647,7 +647,7 @@ as_cluster_remove_nodes(as_cluster* cluster, as_vector* /* <as_node*> */ nodes_t
as_node_deactivate(node);

if (cluster->metrics_enabled) {
cluster->metrics_callbacks->node_close_callback(node->cluster->metrics_policy, node);
cluster->metrics_listeners->node_close_callback(node->cluster->metrics_policy, node);
}
}

Expand Down Expand Up @@ -724,7 +724,7 @@ as_cluster_reset_error_rate(as_cluster* cluster)
as_nodes* nodes = cluster->nodes;

for (uint32_t i = 0; i < nodes->size; i++) {
as_node_reset_error_rate_count(nodes->array[i]);
as_node_reset_error_rate(nodes->array[i]);
}
}

Expand Down Expand Up @@ -968,7 +968,7 @@ as_cluster_tend(as_cluster* cluster, as_error* err, bool is_init)

if (cluster->metrics_enabled && (cluster->tend_count % cluster->metrics_policy->interval))
{
cluster->metrics_callbacks->snapshot_callback(cluster->metrics_policy, cluster);
cluster->metrics_listeners->snapshot_callback(cluster->metrics_policy, cluster);
}

as_cluster_destroy_peers(&peers);
Expand Down
10 changes: 5 additions & 5 deletions src/main/aerospike/as_metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,12 @@ as_metrics_disable(struct as_policy_metrics_s* policy, const struct as_cluster_s
}

void
as_metrics_callbacks_init(as_metrics_callbacks* callbacks)
as_metrics_listeners_init(as_metrics_listeners* listeners)
{
callbacks->enable_callback = as_metrics_enable;
callbacks->disable_callback = as_metrics_disable;
callbacks->node_close_callback = as_metrics_node_close;
callbacks->snapshot_callback = as_metrics_snapshot;
listeners->enable_callback = as_metrics_enable;
listeners->disable_callback = as_metrics_disable;
listeners->node_close_callback = as_metrics_node_close;
listeners->snapshot_callback = as_metrics_snapshot;
}

void
Expand Down
4 changes: 2 additions & 2 deletions src/main/aerospike/as_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ as_node_create(as_cluster* cluster, as_node_info* node_info)
node->sync_conn_pools = cf_malloc(sizeof(as_conn_pool) * cluster->conn_pools_per_node);
node->sync_conns_opened = 1;
node->sync_conns_closed = 0;
node->error_rate_count = 0;
node->error_rate = 0;
node->error_count = 0;
node->timeout_count = 0;
node->conn_iter = 0;
Expand Down Expand Up @@ -909,7 +909,7 @@ static void
as_node_restart(as_cluster* cluster, as_node* node)
{
if (cluster->max_error_rate > 0) {
as_node_reset_error_rate_count(node);
as_node_reset_error_rate(node);
}

// Balance sync connections.
Expand Down

0 comments on commit 65a6f7e

Please sign in to comment.