Skip to content

Commit

Permalink
char-ipc: add blocking support
Browse files Browse the repository at this point in the history
  • Loading branch information
fcangialosi committed May 8, 2018
1 parent ba36c88 commit 249d05f
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 47 deletions.
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ obj-m := $(TARGET).o

all:
$(MAKE) -C /lib/modules/$(shell uname -r)/build M=$(PWD) modules
$(MAKE) -C ./ccpkp
./ccpkp/lfq/multi-writer-test

clean:
$(MAKE) -C /lib/modules/$(shell uname -r)/build M=$(PWD) clean
3 changes: 2 additions & 1 deletion ccpkp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ endif

test: lfq/lfq.c lfq/lfq.h lfq/multi-writer-test.c
gcc lfq/lfq.c lfq/multi-writer-test.c $(DEBFLAGS) -lpthread -o ./lfq/multi-writer-test
./lfq/multi-writer-test

clean:
rm -rf *.o *~ core .depend .*.cmd *.ko *.mod.c .tmp_versions
rm -rf *.o *~ ./lfq/multi-writer-test

15 changes: 9 additions & 6 deletions ccpkp/ccpkp.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,22 +107,25 @@ int ccpkp_user_open(struct inode *inp, struct file *fp) {
// Create new pipe for this CCP
struct kpipe *pipe = kmalloc(sizeof(struct kpipe), GFP_KERNEL);
int i, ccp_id;
#ifndef ONE_PIPE
bool user_read_nonblock = fp->f_flags & O_NONBLOCK;
#endif

memset(pipe, 0, sizeof(struct kpipe));
if (!pipe) {
return -ENOMEM;
}

PDEBUG("init lfq");
if (init_lfq(&pipe->ccp_write_queue) < 0) {
if (init_lfq(&pipe->ccp_write_queue, false) < 0) {
return -ENOMEM;
}
#ifndef ONE_PIPE
#ifndef ONE_PIPE
PDEBUG("init lfq");
if (init_lfq(&pipe->dp_write_queue) < 0) {
if (init_lfq(&pipe->dp_write_queue, !user_read_nonblock) < 0) {
return -ENOMEM;
}
#endif
#endif

// Store pointer to pipe in struct file
fp->private_data = pipe;
Expand Down Expand Up @@ -197,7 +200,7 @@ ssize_t ccpkp_user_read(struct file *fp, char *buf, size_t bytes_to_read, loff_t
// module stores pointer to corresponding ccp kpipe for each socket
ssize_t ccpkp_kernel_read(struct kpipe *pipe, char *buf, size_t bytes_to_read) {
#ifdef ONE_PIPE
printk("error: compiled with a single pipe for test purposes. recompile with ONE_PIPE=n\n")
printk("error: compiled with a single pipe for test purposes. recompile with ONE_PIPE=n\n");
return 0;
#endif
struct lfq *q = &(pipe->ccp_write_queue);
Expand All @@ -216,7 +219,7 @@ ssize_t ccpkp_user_write(struct file *fp, const char *buf, size_t bytes_to_write
// module stores pointer to corresponding ccp kpipe for each socket
ssize_t ccpkp_kernel_write(struct kpipe *pipe, const char *buf, size_t bytes_to_write, int id) {
#ifdef ONE_PIPE
printk("error: compiled with a single pipe for test purposes. recompile with ONE_PIPE=n\n")
printk("error: compiled with a single pipe for test purposes. recompile with ONE_PIPE=n\n");
return 0;
#endif
struct lfq *q = &(pipe->dp_write_queue);
Expand Down
65 changes: 54 additions & 11 deletions ccpkp/lfq/lfq.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "lfq.h"

int init_lfq(struct lfq *q) {
int init_lfq(struct lfq *q, bool blocking) {
q->buf = __MALLOC__(BUF_LEN);
if (!q->buf) {
return -1;
Expand All @@ -27,6 +27,16 @@ int init_lfq(struct lfq *q) {
q->free_head = 0;
q->free_tail = BACKLOG-1;

q->blocking = blocking;
if (blocking) {
#ifdef __KERNEL__
init_waitqueue_head(&q->nonempty);
#else
pthread_mutex_init(&q->wait_lock, NULL);
pthread_cond_init(&q->nonempty, NULL);
#endif
}

return 0;
}

Expand All @@ -36,9 +46,9 @@ void free_lfq(struct lfq *q) {
___FREE___(q->free_list);
}

void init_pipe(struct pipe *p) {
init_lfq(&p->ccp_write_queue);
init_lfq(&p->dp_write_queue);
void init_pipe(struct pipe *p, bool blocking) {
init_lfq(&p->ccp_write_queue, blocking);
init_lfq(&p->dp_write_queue, blocking);
}

void free_pipe(struct pipe *p) {
Expand Down Expand Up @@ -91,14 +101,33 @@ uint16_t read_portus_msg_size(char *buf) {
return *(((uint16_t *)buf)+1);
}

inline bool ready_for_reading(struct lfq *q) {
return (q->read_head != q->write_head) && (q->msg_list[q->read_head] != NULL);
}

ssize_t lfq_read(struct lfq *q, char *buf, size_t bytes_to_read) {
if (q->read_head == q->write_head) {
//PDEBUG("[reader ] queue is empty\n");
return 0;
}
if ((q->msg_list[q->read_head]) == NULL) {
//PDEBUG("[reader ] queue non-empty, but not ready for reading\n");
return 0;

if (q->blocking) {
wait_until_nonempty:
#ifndef __KERNEL__
pthread_mutex_lock(&q->wait_lock);
#endif
while (!ready_for_reading(q)) {
#ifdef __KERNEL__
if (wait_event_interruptible(q->nonempty, ready_for_reading(q))) {
return -ERESTARTSYS;
}
#else
pthread_cond_wait(&q->nonempty, &q->wait_lock);
#endif
}
#ifndef __KERNEL__
pthread_mutex_unlock(&q->wait_lock);
#endif
} else {
if (!ready_for_reading(q)) {
return 0;
}
}

int bytes_read = 0;
Expand Down Expand Up @@ -144,6 +173,10 @@ ssize_t lfq_read(struct lfq *q, char *buf, size_t bytes_to_read) {
q->msg_list[r] = NULL;
buf += bytes_in_block;
}

if (bytes_read == 0) {
goto wait_until_nonempty;
}

return bytes_read;
}
Expand Down Expand Up @@ -184,6 +217,16 @@ ssize_t lfq_write(struct lfq *q, const char *buf, size_t bytes_to_write, int id)
// Assign block to acquired position
q->msg_list[new_i-1] = block;

if (q->blocking) {
#ifdef __KERNEL__
wake_up_interruptible(&q->nonempty);
#else
pthread_mutex_lock(&q->wait_lock);
pthread_cond_signal(&q->nonempty);
pthread_mutex_unlock(&q->wait_lock);
#endif
}

return bytes_to_write;
}

Expand Down
18 changes: 14 additions & 4 deletions ccpkp/lfq/lfq.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#ifndef _LFQ_H_
#define _LFQ_H_


#ifdef __KERNEL__
#include<linux/slab.h>
#include <linux/slab.h>
#include <linux/sched.h>
#include <linux/wait.h>

#ifndef __MALLOC__
#define __MALLOC__(size) kmalloc(size, GFP_KERNEL)
Expand All @@ -21,6 +22,7 @@
#include <stdint.h>
#include <errno.h>
#include <assert.h>
#include <pthread.h>
#ifndef __MALLOC__
#define __MALLOC__(size) malloc(size)
#endif
Expand Down Expand Up @@ -71,16 +73,24 @@ struct lfq {

idx_t read_head, write_head;
idx_t free_head, free_tail;

bool blocking;
#ifdef __KERNEL__
wait_queue_head_t nonempty;
#else
pthread_cond_t nonempty;
pthread_mutex_t wait_lock;
#endif
};

struct pipe {
struct lfq ccp_write_queue;
struct lfq dp_write_queue;
};

int init_lfq(struct lfq *q);
int init_lfq(struct lfq *q, bool blocking);
void free_lfq(struct lfq *q);
void init_pipe(struct pipe *p);
void init_pipe(struct pipe *p, bool blocking);
void free_pipe(struct pipe *p);

char* _lfq_acquire_free_block(struct lfq *q);
Expand Down
Binary file removed ccpkp/lfq/multi-writer-test
Binary file not shown.
65 changes: 42 additions & 23 deletions ccpkp/lfq/multi-writer-test.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,11 @@ void *reader(void *args) {
int num_recvd = 0;
usleep(1000);
while (num_recvd < 10000) {
//printf("trying to read\n");
int read = dp_read(p, recv, 2048);
if (read > 0) {
//printf("[%d] got %d bytes\n", num_recvd, read);
char *p = recv;
while (read > 0) {
int sz = read_portus_msg_size(p);
//print_buf(p);
p+= sz;
read -= sz;
num_recvd++;
Expand All @@ -70,7 +67,7 @@ void *writer1(void *args) {
size_t buf_len;

for (int i=0; i<2500; i++) {
int wrote = 0;//ccp_write(p, buf, buf_len, 1);
int wrote = 0;
while (wrote <= 0) {
usleep(100);
char a[25];
Expand All @@ -88,7 +85,7 @@ void *writer2(void *args) {
struct pipe *p = (struct pipe *)args;
size_t buf_len;
for (int i=0; i<5000; i++) {
int wrote = 0;//ccp_write(p, buf, buf_len, 2);
int wrote = 0;
while (wrote <= 0) {
usleep(100);
char a[25];
Expand All @@ -106,7 +103,7 @@ void *writer3(void *args) {
struct pipe *p = (struct pipe *)args;
size_t buf_len;
for (int i=0; i<2500; i++) {
int wrote = 0;//ccp_write(p, buf, buf_len, 3);
int wrote = 0;
while (wrote <= 0) {
usleep(100);
char a[25];
Expand All @@ -124,23 +121,45 @@ void *writer3(void *args) {
int main() {
srand(time(NULL));

struct pipe *p = (struct pipe *) malloc(sizeof(struct pipe));
init_pipe(p);


pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, reader, (void *)p);
pthread_create(&t2, NULL, writer1, (void *)p);
pthread_create(&t3, NULL, writer2, (void *)p);
pthread_create(&t4, NULL, writer3, (void *)p);

pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);

printf("LFQ multiple writers test passed.\n");
free_pipe(p);
printf("LFQ multiple writers test\n");

printf("blocking......");

{
struct pipe *p = (struct pipe *) malloc(sizeof(struct pipe));
init_pipe(p, true);
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, reader, (void *)p);
pthread_create(&t2, NULL, writer1, (void *)p);
pthread_create(&t3, NULL, writer2, (void *)p);
pthread_create(&t4, NULL, writer3, (void *)p);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
free_pipe(p);
}

printf("passed\n");

printf("nonblocking...");

{
struct pipe *p = (struct pipe *) malloc(sizeof(struct pipe));
init_pipe(p, false);
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, reader, (void *)p);
pthread_create(&t2, NULL, writer1, (void *)p);
pthread_create(&t3, NULL, writer2, (void *)p);
pthread_create(&t4, NULL, writer3, (void *)p);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
free_pipe(p);
}

printf("passed\n");

return 0;
}

0 comments on commit 249d05f

Please sign in to comment.