Skip to content

Commit

Permalink
* MDF [ringbuffer] Lock/Unlock ops of ringbuffer
Browse files Browse the repository at this point in the history
Signed-off-by: Moi Ran <[email protected]>
  • Loading branch information
RanMaoyi committed Dec 7, 2023
1 parent 3ca7e56 commit aac41ca
Showing 1 changed file with 32 additions and 0 deletions.
32 changes: 32 additions & 0 deletions src/supplemental/nanolib/ringbuffer/ringbuffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ int ringBuffer_init(ringBuffer_t **rb,
newRB->deqinRuleListLen = 0;
newRB->deqoutRuleListLen = 0;

nni_mtx_init(&newRB->lock);

*rb = newRB;

return 0;
Expand Down Expand Up @@ -110,8 +112,11 @@ int ringBuffer_enqueue(ringBuffer_t *rb,
unsigned long long expiredAt)
{
int ret;

nni_mtx_lock(&rb->lock);
ret = ringBuffer_rule_check(rb, data, ENQUEUE_IN_HOOK);
if (ret != 0) {
nni_mtx_unlock(&rb->lock);
return -1;
}

Expand All @@ -128,9 +133,11 @@ int ringBuffer_enqueue(ringBuffer_t *rb,
rb->msgs[rb->head].expiredAt = expiredAt;
rb->head = (rb->head + 1) % rb->cap;
rb->tail = (rb->tail + 1) % rb->cap;
nni_mtx_unlock(&rb->lock);
log_error("Ring buffer is full but overwrite the old data\n");
return 0;
} else {
nni_mtx_unlock(&rb->lock);
log_error("Ring buffer is full enqueue failed!!!\n");
return -1;
}
Expand All @@ -147,19 +154,23 @@ int ringBuffer_enqueue(ringBuffer_t *rb,

(void)ringBuffer_rule_check(rb, data, ENQUEUE_OUT_HOOK);

nni_mtx_unlock(&rb->lock);
return 0;
}

int ringBuffer_dequeue(ringBuffer_t *rb, void **data)
{
int ret;
nni_mtx_lock(&rb->lock);
ret = ringBuffer_rule_check(rb, NULL, DEQUEUE_IN_HOOK);
if (ret != 0) {
nni_mtx_unlock(&rb->lock);
return -1;
}

if (rb->size == 0) {
log_error("Ring buffer is NULL dequeue failed\n");
nni_mtx_unlock(&rb->lock);
return -1;
}

Expand All @@ -169,6 +180,7 @@ int ringBuffer_dequeue(ringBuffer_t *rb, void **data)

(void)ringBuffer_rule_check(rb, *data, DEQUEUE_OUT_HOOK);

nni_mtx_unlock(&rb->lock);
return 0;
}

Expand Down Expand Up @@ -199,6 +211,7 @@ int ringBuffer_release(ringBuffer_t *rb)
return -1;
}

nni_mtx_lock(&rb->lock);
if (rb->msgs != NULL) {
if (rb->size != 0) {
i = rb->head;
Expand All @@ -221,6 +234,8 @@ int ringBuffer_release(ringBuffer_t *rb)
ringBufferRuleList_release(rb->enqoutRuleList, rb->enqoutRuleListLen);
ringBufferRuleList_release(rb->deqoutRuleList, rb->deqoutRuleListLen);

nni_mtx_unlock(&rb->lock);
nni_mtx_fini(&rb->lock);
nng_free(rb, sizeof(*rb));

return 0;
Expand Down Expand Up @@ -260,49 +275,58 @@ int ringBuffer_add_rule(ringBuffer_t *rb,
return -1;
}

nni_mtx_lock(&rb->lock);
if (flag & ENQUEUE_IN_HOOK) {
ret = ringBufferRuleList_add(rb->enqinRuleList, &rb->enqinRuleListLen, match, target);
if (ret != 0) {
nni_mtx_unlock(&rb->lock);
return -1;
}
}

if (flag & ENQUEUE_OUT_HOOK) {
ret = ringBufferRuleList_add(rb->enqoutRuleList, &rb->enqoutRuleListLen, match, target);
if (ret != 0) {
nni_mtx_unlock(&rb->lock);
return -1;
}
}

if (flag & DEQUEUE_IN_HOOK) {
ret = ringBufferRuleList_add(rb->deqinRuleList, &rb->deqinRuleListLen, match, target);
if (ret != 0) {
nni_mtx_unlock(&rb->lock);
return -1;
}
}

if (flag & DEQUEUE_OUT_HOOK) {
ret = ringBufferRuleList_add(rb->deqoutRuleList, &rb->deqoutRuleListLen, match, target);
if (ret != 0) {
nni_mtx_unlock(&rb->lock);
return -1;
}
}

nni_mtx_unlock(&rb->lock);
return 0;
}

int ringBuffer_search_msg_by_key(ringBuffer_t *rb, uint32_t key, nni_msg **msg)
{
int i = 0;

nni_mtx_lock(&rb->lock);
for (i = rb->head; i < rb->size; i++) {
i = i % rb->cap;
if (rb->msgs[i].key == key) {
*msg = rb->msgs[i].data;
nni_mtx_unlock(&rb->lock);
return 0;
}
}

nni_mtx_unlock(&rb->lock);
return -1;
}

Expand All @@ -311,6 +335,10 @@ int ringBuffer_search_msgs_by_key(ringBuffer_t *rb, uint32_t key, int count, nni
int i = 0;
int j = 0;

if (rb == NULL || count <= 0 || list == NULL) {
return -1;
}

if (count > rb->size) {
return -1;
}
Expand All @@ -322,13 +350,15 @@ int ringBuffer_search_msgs_by_key(ringBuffer_t *rb, uint32_t key, int count, nni

NNI_LIST_INIT(newList, ringBuffer_msgs_t, node);

nni_mtx_lock(&rb->lock);
for (i = rb->head; i < rb->size; i++) {
i = i % rb->cap;
if (rb->msgs[i].key == key) {
for (j = 0; j < count; j++) {
ringBuffer_msgs_t *msg_node = nng_alloc(sizeof(ringBuffer_msgs_t));
if (msg_node == NULL) {
nng_free(newList, sizeof(nni_list));
nni_mtx_unlock(&rb->lock);
return -1;
}
msg_node->key = rb->msgs[i].key;
Expand All @@ -339,10 +369,12 @@ int ringBuffer_search_msgs_by_key(ringBuffer_t *rb, uint32_t key, int count, nni
i = (i + 1) % rb->cap;
}
*list = newList;
nni_mtx_unlock(&rb->lock);
return 0;
}
}

nni_mtx_unlock(&rb->lock);
nng_free(newList, sizeof(nni_list));
return -1;
}

0 comments on commit aac41ca

Please sign in to comment.