Skip to content

Commit

Permalink
refactor: flatten queryable primitives
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Jan 17, 2024
1 parent d1e22da commit 10b48c6
Showing 1 changed file with 39 additions and 46 deletions.
85 changes: 39 additions & 46 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -275,60 +275,53 @@ _z_queryable_t *_z_declare_queryable(_z_session_rc_t *zn, _z_keyexpr_t keyexpr,
q._dropper = dropper;
q._arg = arg;

// Allocate queryable
_z_queryable_t *ret = (_z_queryable_t *)zp_malloc(sizeof(_z_queryable_t));
if (ret != NULL) {
ret->_zn = _z_session_rc_clone(zn);
ret->_entity_id = q._id;

_z_questionable_rc_t *sp_q =
_z_register_questionable(zn->ptr, &q); // This a pointer to the entry stored at session-level.
// Do not drop it by the end of this function.
if (sp_q != NULL) {
// Build the declare message to send on the wire
_z_declaration_t declaration =
_z_make_decl_queryable(&keyexpr, q._id, q._complete, _Z_QUERYABLE_DISTANCE_DEFAULT);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(zn->ptr, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
// ret = _Z_ERR_TRANSPORT_TX_FAILED;
_z_unregister_questionable(zn->ptr, sp_q);
_z_queryable_free(&ret);
}
_z_n_msg_clear(&n_msg);
} else {
_z_queryable_free(&ret);
}
} else {
if (ret == NULL) {
_z_questionable_clear(&q);
return NULL;
}

// Create questionable entry, stored at session-level, do not drop it by the end of this function.
_z_questionable_rc_t *sp_q = _z_register_questionable(zn->ptr, &q);
if (sp_q == NULL) {
_z_queryable_free(&ret);
return NULL;
}
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_decl_queryable(&keyexpr, q._id, q._complete, _Z_QUERYABLE_DISTANCE_DEFAULT);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(zn->ptr, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
_z_unregister_questionable(zn->ptr, sp_q);
_z_queryable_free(&ret);
return NULL;
}
_z_n_msg_clear(&n_msg);
// Fill queryable
ret->_entity_id = q._id;
ret->_zn = _z_session_rc_clone(zn);
return ret;
}

int8_t _z_undeclare_queryable(_z_queryable_t *qle) {
int8_t ret = _Z_RES_OK;

if (qle != NULL) {
_z_questionable_rc_t *q = _z_get_questionable_by_id(qle->_zn.ptr, qle->_entity_id);
if (q != NULL) {
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_undecl_queryable(qle->_entity_id, &q->ptr->_key);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(qle->_zn.ptr, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) {
// Only if message is successfully send, local queryable state can be removed
_z_unregister_questionable(qle->_zn.ptr, q);
} else {
ret = _Z_ERR_TRANSPORT_TX_FAILED;
}
_z_n_msg_clear(&n_msg);
} else {
ret = _Z_ERR_ENTITY_UNKNOWN;
}
_z_session_rc_drop(&qle->_zn);
} else {
ret = _Z_ERR_ENTITY_UNKNOWN;
if (qle == NULL) {
return _Z_ERR_ENTITY_UNKNOWN;
}

return ret;
// Find questionable entry
_z_questionable_rc_t *q = _z_get_questionable_by_id(qle->_zn.ptr, qle->_entity_id);
if (q == NULL) {
return _Z_ERR_ENTITY_UNKNOWN;
}
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_undecl_queryable(qle->_entity_id, &q->ptr->_key);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(qle->_zn.ptr, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
return _Z_ERR_TRANSPORT_TX_FAILED;
}
_z_n_msg_clear(&n_msg);
// Only if message is successfully send, local queryable state can be removed
_z_unregister_questionable(qle->_zn.ptr, q);
_z_session_rc_drop(&qle->_zn);
return _Z_RES_OK;
}

int8_t _z_send_reply(const z_query_t *query, _z_keyexpr_t keyexpr, const _z_value_t payload) {
Expand Down

0 comments on commit 10b48c6

Please sign in to comment.