Skip to content

Commit

Permalink
refactor: flatten subscriber primitives
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Jan 17, 2024
1 parent 10b48c6 commit ee6be52
Showing 1 changed file with 41 additions and 46 deletions.
87 changes: 41 additions & 46 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,60 +188,55 @@ _z_subscriber_t *_z_declare_subscriber(_z_session_rc_t *zn, _z_keyexpr_t keyexpr
s._dropper = dropper;
s._arg = arg;

// Allocate subscriber
_z_subscriber_t *ret = (_z_subscriber_t *)zp_malloc(sizeof(_z_subscriber_t));
if (ret != NULL) {
ret->_zn = _z_session_rc_clone(zn);
ret->_entity_id = s._id;

_z_subscription_rc_t *sp_s = _z_register_subscription(
zn->ptr, _Z_RESOURCE_IS_LOCAL, &s); // This a pointer to the entry stored at session-level.
// Do not drop it by the end of this function.
if (sp_s != NULL) {
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_decl_subscriber(
&keyexpr, s._id, sub_info.reliability == Z_RELIABILITY_RELIABLE, sub_info.mode == Z_SUBMODE_PULL);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(zn->ptr, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
_z_unregister_subscription(zn->ptr, _Z_RESOURCE_IS_LOCAL, sp_s);
_z_subscriber_free(&ret);
}
_z_n_msg_clear(&n_msg);
} else {
_z_subscriber_free(&ret);
}
} else {
if (ret == NULL) {
_z_subscription_clear(&s);
return NULL;
}

// Register subscription, stored at session-level, do not drop it by the end of this function.
_z_subscription_rc_t *sp_s = _z_register_subscription(zn->ptr, _Z_RESOURCE_IS_LOCAL, &s);
if (sp_s == NULL) {
_z_subscriber_free(&ret);
return NULL;
}
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_decl_subscriber(
&keyexpr, s._id, sub_info.reliability == Z_RELIABILITY_RELIABLE, sub_info.mode == Z_SUBMODE_PULL);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(zn->ptr, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
_z_unregister_subscription(zn->ptr, _Z_RESOURCE_IS_LOCAL, sp_s);
_z_subscriber_free(&ret);
return NULL;
}
_z_n_msg_clear(&n_msg);
// Fill subscriber
ret->_entity_id = s._id;
ret->_zn = _z_session_rc_clone(zn);
return ret;
}

int8_t _z_undeclare_subscriber(_z_subscriber_t *sub) {
int8_t ret = _Z_ERR_GENERIC;

if (sub != NULL) {
_z_subscription_rc_t *s = _z_get_subscription_by_id(sub->_zn.ptr, _Z_RESOURCE_IS_LOCAL, sub->_entity_id);
if (s != NULL) {
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_undecl_subscriber(sub->_entity_id, &s->ptr->_key);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(sub->_zn.ptr, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) {
// Only if message is successfully send, local subscription state can be removed
_z_undeclare_resource(sub->_zn.ptr, s->ptr->_key_id);
_z_unregister_subscription(sub->_zn.ptr, _Z_RESOURCE_IS_LOCAL, s);
_z_session_rc_drop(&sub->_zn);
} else {
ret = _Z_ERR_ENTITY_UNKNOWN;
}
_z_n_msg_clear(&n_msg);
} else {
ret = _Z_ERR_ENTITY_UNKNOWN;
}
} else {
ret = _Z_ERR_ENTITY_UNKNOWN;
if (sub == NULL) {
return _Z_ERR_ENTITY_UNKNOWN;
}

return ret;
// Find subscription entry
_z_subscription_rc_t *s = _z_get_subscription_by_id(sub->_zn.ptr, _Z_RESOURCE_IS_LOCAL, sub->_entity_id);
if (s == NULL) {
return _Z_ERR_ENTITY_UNKNOWN;
}
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_undecl_subscriber(sub->_entity_id, &s->ptr->_key);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(sub->_zn.ptr, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
return _Z_ERR_TRANSPORT_TX_FAILED;
}
_z_n_msg_clear(&n_msg);
// Only if message is successfully send, local subscription state can be removed
_z_undeclare_resource(sub->_zn.ptr, s->ptr->_key_id);
_z_unregister_subscription(sub->_zn.ptr, _Z_RESOURCE_IS_LOCAL, s);
_z_session_rc_drop(&sub->_zn);
return _Z_RES_OK;
}

/*------------------ Pull ------------------*/
Expand Down

0 comments on commit ee6be52

Please sign in to comment.