Skip to content

Commit

Permalink
Review cluster settings
Browse files Browse the repository at this point in the history
  • Loading branch information
Silviu Caragea committed Jun 26, 2015
1 parent ab0495e commit 56ef450
Showing 1 changed file with 69 additions and 88 deletions.
157 changes: 69 additions & 88 deletions c_src/nif_cass_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
std::string value; \
if(!get_string(env, term_value, value)) \
return false; \
*error = Func(data->cluster, value.c_str()); \
return true; \
return cass_error_to_nif_term(env, Func(data->cluster, value.c_str())); \
}

#define INT_SETTING(Key, Func) \
Expand All @@ -30,16 +29,7 @@
int value; \
if(!enif_get_int(env, term_value, &value)) \
return false; \
*error = Func(data->cluster, value); \
return true; \
}

#define BOOL_SETTING(Key, Func) \
if(enif_is_identical(term_key, Key)) \
{ \
cass_bool_t value = enif_is_identical(term_value, ATOMS.atomTrue) ? cass_true : cass_false; \
*error = Func(data->cluster, value); \
return true; \
return cass_error_to_nif_term(env, Func(data->cluster, value)); \
}

#define UNSIGNED_INT_SETTING(Key, Func) \
Expand All @@ -48,15 +38,12 @@
unsigned int value; \
if(!enif_get_uint(env, term_value, &value)) \
return false; \
*error = Func(data->cluster, value); \
return true; \
return cass_error_to_nif_term(env, Func(data->cluster, value)); \
}

#define CUSTOM_SETTING(Key, Func) \
if(enif_is_identical(term_key, Key)) \
{ \
return Func(env, term_value, data, error); \
}
return Func(env, term_value, data);

class CassSslScope
{
Expand Down Expand Up @@ -90,96 +77,96 @@ CassError internal_cass_cluster_set_request_timeout(CassCluster* cluster, unsign
return CASS_OK;
}

CassError internal_cass_cluster_set_token_aware_routing(CassCluster* cluster, cass_bool_t enabled)
ERL_NIF_TERM internal_cass_cluster_set_token_aware_routing(ErlNifEnv* env, ERL_NIF_TERM term_value, cassandra_data* data)
{
cass_cluster_set_token_aware_routing(cluster, enabled);
return CASS_OK;
cass_bool_t value = enif_is_identical(term_value, ATOMS.atomTrue) ? cass_true : cass_false;
cass_cluster_set_token_aware_routing(data->cluster, value);
return ATOMS.atomOk;
}

CassError internal_cass_cluster_set_tcp_nodelay(CassCluster* cluster, cass_bool_t enabled)
ERL_NIF_TERM internal_cass_cluster_set_tcp_nodelay(ErlNifEnv* env, ERL_NIF_TERM term_value, cassandra_data* data)
{
cass_cluster_set_tcp_nodelay(cluster, enabled);
return CASS_OK;
cass_bool_t value = enif_is_identical(term_value, ATOMS.atomTrue) ? cass_true : cass_false;
cass_cluster_set_tcp_nodelay(data->cluster, value);
return ATOMS.atomOk;
}

bool internal_cass_cluster_set_load_balance_round_robin(ErlNifEnv* env, ERL_NIF_TERM term_value, cassandra_data* data, CassError* error)
ERL_NIF_TERM internal_cass_cluster_set_load_balance_round_robin(ErlNifEnv* env, ERL_NIF_TERM term_value, cassandra_data* data)
{
cass_cluster_set_load_balance_round_robin(data->cluster);
*error = CASS_OK;
return true;
return ATOMS.atomOk;
}

bool internal_cass_cluster_set_credentials(ErlNifEnv* env, ERL_NIF_TERM term_value, cassandra_data* data, CassError* error)
ERL_NIF_TERM internal_cass_cluster_set_credentials(ErlNifEnv* env, ERL_NIF_TERM term_value, cassandra_data* data)
{
const ERL_NIF_TERM *items;
int arity;

if(!enif_get_tuple(env, term_value, &arity, &items) || arity != 2)
return false;
return enif_make_badarg(env);

std::string username;
std::string pwd;

if(!get_string(env, items[0], username) || !get_string(env, items[1], pwd))
return false;
return enif_make_badarg(env);

cass_cluster_set_credentials(data->cluster, username.c_str(), pwd.c_str());
*error = CASS_OK;
return true;
return ATOMS.atomOk;
}

bool internal_cass_cluster_set_load_balance_dc_aware(ErlNifEnv* env, ERL_NIF_TERM term_value, cassandra_data* data, CassError* error)
ERL_NIF_TERM internal_cass_cluster_set_load_balance_dc_aware(ErlNifEnv* env, ERL_NIF_TERM term_value, cassandra_data* data)
{
const ERL_NIF_TERM *items;
int arity;

if(!enif_get_tuple(env, term_value, &arity, &items) || arity != 3)
return false;
return enif_make_badarg(env);

std::string local_dc;
unsigned int used_hosts_per_remote_dc;

if(!get_string(env, items[0], local_dc) || !enif_get_uint(env, items[1], &used_hosts_per_remote_dc))
return false;
return enif_make_badarg(env);

cass_bool_t allow_remote_dcs_for_local_cl = enif_is_identical(items[2], ATOMS.atomTrue) ? cass_true : cass_false;

*error = cass_cluster_set_load_balance_dc_aware(data->cluster, local_dc.c_str(), used_hosts_per_remote_dc, allow_remote_dcs_for_local_cl);
return true;
return cass_error_to_nif_term(env,
cass_cluster_set_load_balance_dc_aware(data->cluster,
local_dc.c_str(),
used_hosts_per_remote_dc,
allow_remote_dcs_for_local_cl));
}

bool internal_cass_cluster_set_tcp_keepalive(ErlNifEnv* env, ERL_NIF_TERM term_value, cassandra_data* data, CassError* error)
ERL_NIF_TERM internal_cass_cluster_set_tcp_keepalive(ErlNifEnv* env, ERL_NIF_TERM term_value, cassandra_data* data)
{
const ERL_NIF_TERM *items;
int arity;

if(!enif_get_tuple(env, term_value, &arity, &items) || arity != 2)
return false;
return enif_make_badarg(env);

unsigned delay_sec;
if(!enif_is_atom(env, items[0]) || !enif_get_uint(env, items[1], &delay_sec))
return false;
return enif_make_badarg(env);

cass_bool_t enabled = enif_is_identical(items[0], ATOMS.atomTrue) ? cass_true : cass_false;
cass_cluster_set_tcp_keepalive(data->cluster, enabled, delay_sec);

*error = CASS_OK;
return true;
return ATOMS.atomOk;
}

bool internal_cluster_set_default_consistency_level(ErlNifEnv* env, ERL_NIF_TERM term_value, cassandra_data* data, CassError* error)
ERL_NIF_TERM internal_cluster_set_default_consistency_level(ErlNifEnv* env, ERL_NIF_TERM term_value, cassandra_data* data)
{
int cLevel;

if(!enif_get_int(env, term_value, &cLevel))
return false;
return enif_make_badarg(env);

data->defaultConsistencyLevel = static_cast<CassConsistency>(cLevel);
*error = CASS_OK;
return true;
return ATOMS.atomOk;
}

bool internal_cass_cluster_set_ssl(ErlNifEnv* env, ERL_NIF_TERM term_value, cassandra_data* data, CassError* error)
ERL_NIF_TERM internal_cass_cluster_set_ssl(ErlNifEnv* env, ERL_NIF_TERM term_value, cassandra_data* data)
{
ERL_NIF_TERM head;

Expand All @@ -195,7 +182,7 @@ bool internal_cass_cluster_set_ssl(ErlNifEnv* env, ERL_NIF_TERM term_value, cass
int arity;

if(!enif_get_tuple(env, head, &arity, &items) || arity != 2)
return false;
return enif_make_badarg(env);

if(enif_is_identical(items[0], ATOMS.atomClusterSettingSslTrustedCerts))
{
Expand All @@ -207,102 +194,100 @@ bool internal_cass_cluster_set_ssl(ErlNifEnv* env, ERL_NIF_TERM term_value, cass
while(enif_get_list_cell(env, trust_list, &cert_head, &trust_list))
{
if(!get_string(env, cert_head, cert))
return false;
return enif_make_badarg(env);

trusted_certs.push_back(cert);
}
}
else if(enif_is_identical(items[0], ATOMS.atomClusterSettingSslCert))
{
if(!get_string(env, items[1], cert))
return false;
return enif_make_badarg(env);
}
else if(enif_is_identical(items[0], ATOMS.atomClusterSettingSslPrivateKey))
{
const ERL_NIF_TERM *pk_items;
int pk_arity;

if(!enif_get_tuple(env, items[1], &pk_arity, &pk_items) || pk_arity != 2)
return false;
return enif_make_badarg(env);

if(!get_string(env, pk_items[0], private_key))
return false;
return enif_make_badarg(env);

if(!get_string(env, pk_items[1], private_key_pwd))
return false;
return enif_make_badarg(env);
}
else if(enif_is_identical(items[0], ATOMS.atomClusterSettingSslVerifyFlags))
{
if(!enif_get_int(env, items[1], &verify_flags))
return false;
return enif_make_badarg(env);
}
else
{
//no valid option
return false;
return enif_make_badarg(env);
}
}

CassSslScope ssl(cass_ssl_new());

for (std::vector<std::string>::const_iterator it = trusted_certs.begin(); it != trusted_certs.end(); ++it)
{
*error = cass_ssl_add_trusted_cert(ssl.get(), (*it).c_str());
CassError error = cass_ssl_add_trusted_cert(ssl.get(), (*it).c_str());

if(*error != CASS_OK)
return true;
if(error != CASS_OK)
return cass_error_to_nif_term(env, error);
}

if(!cert.empty())
{
*error = cass_ssl_set_cert(ssl.get(), cert.c_str());
CassError error = cass_ssl_set_cert(ssl.get(), cert.c_str());

if(*error != CASS_OK)
return true;
if(error != CASS_OK)
return cass_error_to_nif_term(env, error);
}

if(!private_key.empty())
{
*error = cass_ssl_set_private_key(ssl.get(), private_key.c_str(), private_key_pwd.c_str());
CassError error = cass_ssl_set_private_key(ssl.get(), private_key.c_str(), private_key_pwd.c_str());

if(*error != CASS_OK)
return true;
if(error != CASS_OK)
return cass_error_to_nif_term(env, error);
}

if(verify_flags != CASS_SSL_VERIFY_NONE)
cass_ssl_set_verify_flags(ssl.get(), verify_flags);

cass_cluster_set_ssl(data->cluster, ssl.get());
*error = CASS_OK;

return true;
return ATOMS.atomOk;
}

bool internal_cluster_set_latency_aware_routing(ErlNifEnv* env, ERL_NIF_TERM term_value, cassandra_data* data, CassError* error)
ERL_NIF_TERM internal_cluster_set_latency_aware_routing(ErlNifEnv* env, ERL_NIF_TERM term_value, cassandra_data* data)
{
if(enif_is_atom(env, term_value))
{
//only enable/disable

cass_bool_t enabled = enif_is_identical(term_value, ATOMS.atomTrue) ? cass_true : cass_false;
cass_cluster_set_latency_aware_routing(data->cluster, enabled);
*error = CASS_OK;
return true;
return ATOMS.atomOk;
}

const ERL_NIF_TERM *items;
int arity;

if(!enif_get_tuple(env, term_value, &arity, &items) || arity != 2)
return false;
return enif_make_badarg(env);

cass_bool_t enabled = enif_is_identical(items[0], ATOMS.atomTrue) ? cass_true : cass_false;
cass_cluster_set_latency_aware_routing(data->cluster, enabled);

//set also the settings

if(!enif_get_tuple(env, items[1], &arity, &items) || arity != 5)
return false;
return enif_make_badarg(env);

double exclusion_threshold;
unsigned long scale_ms;
Expand All @@ -311,26 +296,25 @@ bool internal_cluster_set_latency_aware_routing(ErlNifEnv* env, ERL_NIF_TERM ter
unsigned long min_measured;

if(!enif_get_double(env, items[0], &exclusion_threshold))
return false;
return enif_make_badarg(env);

if(!enif_get_uint64(env, items[1], &scale_ms))
return false;
return enif_make_badarg(env);

if(!enif_get_uint64(env, items[2], &retry_period_ms))
return false;
return enif_make_badarg(env);

if(!enif_get_uint64(env, items[3], &update_rate_ms))
return false;
return enif_make_badarg(env);

if(!enif_get_uint64(env, items[4], &min_measured))
return false;
return enif_make_badarg(env);

cass_cluster_set_latency_aware_routing_settings(data->cluster, exclusion_threshold, scale_ms, retry_period_ms, update_rate_ms, min_measured);
*error = CASS_OK;
return true;
return ATOMS.atomOk;
}

bool apply_cluster_settings(ErlNifEnv* env, ERL_NIF_TERM term_key, ERL_NIF_TERM term_value, cassandra_data* data, CassError* error)
ERL_NIF_TERM apply_cluster_settings(ErlNifEnv* env, ERL_NIF_TERM term_key, ERL_NIF_TERM term_value, cassandra_data* data)
{
CUSTOM_SETTING(ATOMS.atomClusterDefaultConsistencyLevel, internal_cluster_set_default_consistency_level);

Expand All @@ -357,12 +341,12 @@ bool apply_cluster_settings(ErlNifEnv* env, ERL_NIF_TERM term_key, ERL_NIF_TERM
CUSTOM_SETTING(ATOMS.atomClusterSettingCredentials, internal_cass_cluster_set_credentials);
CUSTOM_SETTING(ATOMS.atomClusterSettingLoadBalanceRoundRobin, internal_cass_cluster_set_load_balance_round_robin);
CUSTOM_SETTING(ATOMS.atomClusterSettingLoadBalanceDcAware, internal_cass_cluster_set_load_balance_dc_aware);
BOOL_SETTING(ATOMS.atomClusterSettingTokenAwareRouting, internal_cass_cluster_set_token_aware_routing);
CUSTOM_SETTING(ATOMS.atomClusterSettingTokenAwareRouting, internal_cass_cluster_set_token_aware_routing);
CUSTOM_SETTING(ATOMS.atomClusterSetringLatencyAwareRouting, internal_cluster_set_latency_aware_routing);
BOOL_SETTING(ATOMS.atomClusterSettingTcpNodelay, internal_cass_cluster_set_tcp_nodelay);
CUSTOM_SETTING(ATOMS.atomClusterSettingTcpNodelay, internal_cass_cluster_set_tcp_nodelay);
CUSTOM_SETTING(ATOMS.atomClusterSettingTcpKeepalive, internal_cass_cluster_set_tcp_keepalive);

return false;
return enif_make_badarg(env);
}

ERL_NIF_TERM nif_cass_cluster_set_options(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
Expand All @@ -378,8 +362,6 @@ ERL_NIF_TERM nif_cass_cluster_set_options(ErlNifEnv* env, int argc, const ERL_NI
const ERL_NIF_TERM *items;
int arity;

CassError error;

while(enif_get_list_cell(env, list, &head, &list))
{
if(!enif_get_tuple(env, head, &arity, &items) || arity != 2)
Expand All @@ -388,11 +370,10 @@ ERL_NIF_TERM nif_cass_cluster_set_options(ErlNifEnv* env, int argc, const ERL_NI
if(!enif_is_atom(env, items[0]))
return enif_make_badarg(env);

if(!apply_cluster_settings(env, items[0], items[1], data, &error))
return enif_make_badarg(env);
ERL_NIF_TERM result = apply_cluster_settings(env, items[0], items[1], data);

if(error != CASS_OK)
return make_error(env, cass_error_desc(error));
if(!enif_is_identical(ATOMS.atomOk, result))
return result;
}

return ATOMS.atomOk;
Expand Down

0 comments on commit 56ef450

Please sign in to comment.