Skip to content

Commit

Permalink
mac其它测试
Browse files Browse the repository at this point in the history
  • Loading branch information
ncy committed Mar 29, 2024
1 parent 09950a7 commit a35da6c
Show file tree
Hide file tree
Showing 22 changed files with 226 additions and 339 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
build
vs
tags
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ cmake_minimum_required(VERSION 2.8)
project(coroutine)

enable_language(C)
add_compile_options(-Wreturn)

if (UNIX OR MINGW)
enable_language(ASM)
Expand Down
92 changes: 50 additions & 42 deletions aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,26 @@
#endif
#include <stdlib.h>

extern void event_init(io_mgr_t * io_mgr);
extern void event_destroy(io_mgr_t * io_mgr);
extern int event_wait(io_mgr_t * io_mgr, long long timeout);
extern void event_add(io_mgr_t * io_mgr, int fd);
extern void event_del(io_mgr_t * io_mgr, int fd);
void io_init()
extern void *event_create();
extern void event_destroy(void * ctx);
extern int event_wait(void * ctx, array_t * fired, long long timeout);
extern void event_add(void * ctx, int fd);
extern void event_del(void * ctx, int fd);

aio_t * aio_create()
{
env_t * env = thread_env();
event_init(&env->io_mgr);
map_init(&thread_env()->io_mgr.wait_map, less, equals);
env->io_mgr.dead = list_create();
aio_t * aio = (aio_t*)malloc(sizeof(aio_t));
aio->wait_map = map_create(nullptr, nullptr);
aio->fired_events = array_create();
aio->event_ctx = event_create();
aio->dead = list_create();

return aio;
}
void io_destroy()
void aio_destroy(aio_t * aio)
{
event_destroy(&thread_env()->io_mgr);
map_destroy(&thread_env()->io_mgr.wait_map);
event_destroy(aio->event_ctx);
map_destroy(aio->wait_map);
}

any_t encode_event(int fd, int events)
Expand All @@ -37,22 +41,20 @@ void decode_event(any_t event, int * fd, int * events)
*events = (long long)event & 0x7;
}

void io_update(long long timeout)
void aio_update(aio_t * aio, long long timeout)
{
if (!map_size(&thread_env()->io_mgr.wait_map)) {
if (!map_size(aio->wait_map)) {
return;
}

io_mgr_t * io_mgr = &thread_env()->io_mgr;

int num_events = event_wait(io_mgr, timeout);
int num_events = event_wait(aio->event_ctx, aio->fired_events, timeout);
for (int i = 0; i < num_events; ++ i) {
int fd = 0;
int events = 0;
any_t e = array_get(&io_mgr->fired_events, i);
any_t e = array_get(aio->fired_events, i);
decode_event(e, &fd, &events);
map_iterator_t iter = map_find(&io_mgr->wait_map, (any_t)fd);
if (map_iterator_valid(&io_mgr->wait_map, iter)) {
map_iterator_t iter = map_get(aio->wait_map, (any_t)fd);
if (map_iterator_valid(aio->wait_map, iter)) {
wait_info_t * wait = (wait_info_t*)map_iterator_get(iter);
if ((events & IO_READ) && !list_empty(wait->reader)) {
co_t * co = list_pop_front(wait->reader);
Expand All @@ -65,8 +67,8 @@ void io_update(long long timeout)
}
}

while (!list_empty(io_mgr->dead)) {
wait_info_t * wait = (wait_info_t*)list_pop_front(io_mgr->dead);
while (!list_empty(aio->dead)) {
wait_info_t * wait = (wait_info_t*)list_pop_front(aio->dead);
while (!list_empty(wait->reader)) {
co_t * co = list_pop_front(wait->reader);
co_resume(co);
Expand All @@ -81,10 +83,10 @@ void io_update(long long timeout)
}
}

void io_wait(int fd, int events)
void aio_wait(aio_t * aio, int fd, int events)
{
map_iterator_t iter = map_find(&thread_env()->io_mgr.wait_map, (any_t)fd);
if (!map_iterator_valid(&thread_env()->io_mgr.wait_map, iter)) {
map_iterator_t iter = map_get(aio->wait_map, (any_t)fd);
if (!map_iterator_valid(aio->wait_map, iter)) {
return;
}
wait_info_t * wait = (wait_info_t*)map_iterator_get(iter);
Expand All @@ -108,39 +110,45 @@ int io_setnoblocking(int fd)
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;

#endif
}


void io_add(int fd)
void aio_add(aio_t * aio, int fd)
{
io_mgr_t * io_mgr = &thread_env()->io_mgr;
map_iterator_t iter = map_find(&io_mgr->wait_map, (any_t)fd);
map_iterator_t iter = map_get(aio->wait_map, (any_t)fd);
wait_info_t * wait = nullptr;
if (map_iterator_valid(&io_mgr->wait_map, iter)) {
wait = map_iterator_get(iter);
} else {
if (!map_iterator_valid(aio->wait_map, iter)) {
wait = (wait_info_t*)malloc(sizeof(wait_info_t));
wait->reader = list_create();
wait->writer = list_create();
map_set(&io_mgr->wait_map, (any_t)fd, wait);
map_set(aio->wait_map, (any_t)fd, wait);
}
event_add(&thread_env()->io_mgr, fd);
event_add(aio->event_ctx, fd);
co_info("%d add to io set", fd);
}

void io_del(int fd)
void aio_del(aio_t * aio, int fd)
{
io_mgr_t * io_mgr = &thread_env()->io_mgr;
co_info("%d remove from io set", fd);
event_del(io_mgr, fd);
map_iterator_t iter = map_find(&io_mgr->wait_map, (any_t)fd);
if (map_iterator_valid(&io_mgr->wait_map, iter)) {
event_del(aio->event_ctx, fd);
map_iterator_t iter = map_get(aio->wait_map, (any_t)fd);
if (map_iterator_valid(aio->wait_map, iter)) {
wait_info_t * wait = map_iterator_get(iter);
list_push_back(io_mgr->dead, wait);
map_remove_key(&thread_env()->io_mgr.wait_map, (any_t)fd);
list_push_back(aio->dead, wait);
map_remove_key(aio->wait_map, (any_t)fd);
}
}

void aio_debug_print_info()
{
aio_t * aio = thread_env()->aio;
map_t * wait_map = aio->wait_map;

for (map_iterator_t it = map_begin(wait_map); it != map_end(wait_map); it = map_next(it)) {
wait_info_t * wait = (wait_info_t*)map_iterator_get(it);
list_t * rl = wait->reader;
list_t * wl = wait->writer;
printf("r/w of %d count:%d/%d\n", (int)it->key, list_size(rl), list_size(wl));
}
}
25 changes: 13 additions & 12 deletions aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,23 @@ typedef struct
list_t * writer;
} wait_info_t;

typedef struct io_mgr_t
typedef struct aio_mgr_t
{
void * event_ctx;
array_t fired_events;
map_t wait_map;
array_t * fired_events;
map_t * wait_map;
list_t * dead;
} io_mgr_t;
void * event_ctx;
} aio_t;

void io_init();
void io_add(int fd);
void io_del(int fd);
void io_wait(int fd, int events);
aio_t * aio_create();
void aio_add(aio_t * aio, int fd);
void aio_del(aio_t * aio, int fd);
void aio_wait(aio_t * aio, int fd, int events);
any_t encode_event(int fd, int events);
void decode_event(any_t event, int * fd, int * events);
void io_setnoblock(int fd);
void io_update(long long timeout);
void io_destroy();
void io_setnoblock( int fd);
void aio_update(aio_t * aio, long long timeout);
void aio_destroy(aio_t * aio);
void aio_debug_print_info();


30 changes: 15 additions & 15 deletions aio_select.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,32 @@ typedef struct
fd_set all_fds;
}select_state_t;

void event_init(io_mgr_t * io_mgr)
void * event_create()
{
select_state_t * state = (select_state_t*)malloc(sizeof(select_state_t));
state->max_fd = 0;
FD_ZERO(&state->all_fds);
io_mgr->event_ctx = state;
return state;
}

void event_add(io_mgr_t * io_mgr, int fd)
void event_destroy(void * event)
{
select_state_t * state = (select_state_t*)io_mgr->event_ctx;
free(event);
}

void event_add(void * ctx, int fd)
{
select_state_t * state = (select_state_t*)ctx;
FD_SET(fd, &state->all_fds);
if (fd > state->max_fd) {
state->max_fd = fd;
}
}

void event_del(io_mgr_t * io_mgr, int fd)
void event_del(void * ctx, int fd)
{
select_state_t * state = (select_state_t*)io_mgr->event_ctx;
select_state_t * state = (select_state_t*)ctx;
FD_CLR(fd, &state->all_fds);
return;
if (state->max_fd == fd) {
state->max_fd --;
while (!FD_ISSET(state->max_fd, &state->all_fds)) {
Expand All @@ -43,33 +47,29 @@ void event_del(io_mgr_t * io_mgr, int fd)
}
}

int event_wait(io_mgr_t * io_mgr, long long ms)
int event_wait(void * ctx, array_t * fired_events, long long ms)
{
select_state_t * state = (select_state_t*)io_mgr->event_ctx;
select_state_t * state = (select_state_t*)ctx;
fd_set read_fds = state->all_fds;
fd_set write_fds = state->all_fds;
fd_set execpt_fds = state->all_fds;
struct timeval tv;
tv.tv_sec = ms/1000;
tv.tv_usec = (ms%1000)*1000;
int ready = select(state->max_fd + 1, &read_fds, &write_fds, &execpt_fds, ms == -1L? NULL : &tv);
array_resize(&io_mgr->fired_events, ready);
array_resize(fired_events, ready);
if (ready > 0) {
int j = 0;
for (int i = 0; i<= state->max_fd; ++ i) {
int events = 0;
if (FD_ISSET(i, &read_fds)) { events |= IO_READ; }
if (FD_ISSET(i, &write_fds)) { events |= IO_WRITE; }
if (events) {
array_set(&io_mgr->fired_events, j++, encode_event(i, events));
array_set(fired_events, j++, encode_event(i, events));
}
}
}

return ready;
}

int event_destroy(io_mgr_t * io_mgr_t)
{
return 0;
}
3 changes: 1 addition & 2 deletions container/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ typedef struct
any_t * data;
}array_t;

void array_init(array_t * array);
array_t * array_create();
void array_destroy(array_t * array);
any_t * array_begin(array_t * array);
any_t * array_end(array_t * array);
any_t array_get(array_t * array, int index);
Expand All @@ -19,7 +19,6 @@ void array_set(array_t * array, int index, any_t any);
int array_push(array_t * array, any_t any);
int array_insert(array_t * array, int index, any_t any);
int array_erase(array_t * array, int index);
void array_destroy(array_t * array);
void array_resize(array_t * array, size_t size);
void array_reserve(array_t * array, size_t size);

Expand Down
5 changes: 2 additions & 3 deletions container/list.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ typedef struct
typedef list_node_t * list_iter_t;


list_node_t * list_node_create();
list_t * list_create();
void list_node_destroy(list_node_t * node);
void list_init(list_t * list);
any_t list_front(list_t * list);
any_t list_back(list_t * list);
void list_push_front(list_t * list, any_t value);
Expand All @@ -30,9 +28,10 @@ any_t list_pop_front(list_t * list);
any_t list_pop_back(list_t * list);
void list_destroy(list_t * list);
size_t list_size(list_t * list);

static inline list_iter_t list_begin(list_t * list) { return list->head.next; }
static inline list_iter_t list_end(list_t * list) { return &list->tail; }

static inline list_iter_t list_next(list_iter_t iter) { return iter->next ;};
static inline list_iter_t list_rbegin(list_t * list) { return list->tail.prev; }
static inline list_iter_t list_rend(list_t * list) { return &list->head; }
static inline any_t list_iter_get(list_iter_t iter) { return iter->value; }
Expand Down
Loading

0 comments on commit a35da6c

Please sign in to comment.