Skip to content

Commit

Permalink
...
Browse files Browse the repository at this point in the history
  • Loading branch information
rsenn committed Feb 18, 2024
1 parent d53a5da commit 3932c52
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 82 deletions.
1 change: 1 addition & 0 deletions lib/asynciterator.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ BOOL
asynciterator_stop(AsyncIterator* it, JSValueConst value, JSContext* ctx) {
if(asynciterator_pending(it)) {
asynciterator_emplace(it, value, TRUE, ctx);
it->closing = FALSE;
it->closed = TRUE;
asynciterator_cancel(it, JS_NULL, ctx);
return TRUE;
Expand Down
176 changes: 121 additions & 55 deletions lib/generator.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ dequeue_value(Generator* gen, BOOL* done_p, BOOL* binary_p) {
static Queue*
create_queue(Generator* gen) {
if(!gen->q) {
#ifdef DEBUG_OUTPUT

#ifdef DEBUG_OUTPUT_
printf("Creating Queue... %s\n", JS_ToCString(gen->ctx, gen->callback));
#endif
gen->q = queue_new(gen->ctx);
Expand All @@ -34,7 +35,7 @@ enqueue_block(Generator* gen, ByteBlock blk, JSValueConst callback) {
QueueItem* item;
ssize_t ret = block_SIZE(&blk);

#ifdef DEBUG_OUTPUT
#ifdef DEBUG_OUTPUT_
printf("%s blk.size=%zu\n", __func__, block_SIZE(&blk));
#endif

Expand Down Expand Up @@ -129,30 +130,47 @@ static int
generator_update(Generator* gen) {
int i = 0;

while(!list_empty(&gen->iterator.reads) && gen->q && !queue_empty(gen->q)) {
while(!list_empty(&gen->iterator.reads) && gen->q && queue_size(gen->q)) {
size_t s = block_SIZE(&queue_front(gen->q)->block);

if(!gen->closing)
if(gen->buffering && block_SIZE(&queue_front(gen->q)->block) < gen->chunk_size)
if(gen->buffering && s < gen->chunk_size)
break;

BOOL done = FALSE, binary = FALSE;
JSValue chunk = dequeue_value(gen, &done, &binary);

#ifdef DEBUG_OUTPUT
printf("%-22s i: %i reads: %zu q->items: %zu done: %i\n", __func__, i, list_size(&gen->iterator.reads), gen->q ? list_size(&gen->q->items) : 0, done);
printf("%-22s i: %i queue: %zu/%zub dequeued: %zu done: %i\n", __func__, i, gen->q ? queue_size(gen->q) : 0, gen->q ? queue_bytes(gen->q) : 0, s, done);
#endif

// asynciterator_emplace(&gen->iterator, chunk, done, gen->ctx);
done ? asynciterator_stop(&gen->iterator, JS_UNDEFINED, gen->ctx) : asynciterator_yield(&gen->iterator, chunk, gen->ctx);

JS_FreeValue(gen->ctx, chunk);

if(done)
gen->closed = TRUE;

++i;
}

/* if(gen->closing && !gen->closed) {
asynciterator_stop(&gen->iterator, JS_UNDEFINED, gen->ctx);
gen->closing=FALSE;
gen->closed=TRUE;
}*/

#ifdef DEBUG_OUTPUT
printf("%-22s gen: %p chunk_size: %zu i: %zu reads: %zu continuous: %i buffering: %i closing: %i closed: %i r/w: %zu/%zu queue: %zu/%zub\n",
__func__,
(uint32_t)gen,
gen->chunk_size,
i,
list_size(&gen->iterator.reads),
(gen->q && gen->q->continuous),
gen->buffering,
gen->closing,
gen->closed,
gen->bytes_read,
gen->bytes_written,
gen->q ? queue_size(gen->q) : 0,
gen->q ? queue_bytes(gen->q) : 0);
#endif
return i;
}

Expand Down Expand Up @@ -228,16 +246,17 @@ JSValue
generator_next(Generator* gen, JSValueConst arg) {
JSValue ret = JS_UNDEFINED;

/* if(gen->closing && !asynciterator_pending(&gen->iterator)) {
ResolveFunctions async;
ret = js_async_create(gen->ctx, &async);
js_async_resolve(gen->ctx, &async, js_iterator_result(gen->ctx, JS_UNDEFINED, TRUE));
js_async_free(JS_GetRuntime(gen->ctx), &async);
gen->closing = FALSE;
gen->closed = TRUE;
return ret;
}
*/
#ifdef DEBUG_OUTPUT
printf("%-22s gen: %p chunk_size: %zu reads: %zu pending: %zu queue: %zu/%zub\n",
__func__,
(uint32_t)gen,
gen->chunk_size,
list_size(&gen->iterator.reads),
asynciterator_pending(&gen->iterator),
gen->q ? queue_size(gen->q) : 0,
gen->q ? queue_bytes(gen->q) : 0);
#endif

ret = asynciterator_next(&gen->iterator, arg, gen->ctx);

if(!start_executor(gen) && !gen->started) {
Expand All @@ -249,17 +268,28 @@ generator_next(Generator* gen, JSValueConst arg) {

if(n == 0) {
if(gen->closing || gen->closed) {
if(asynciterator_emplace(&gen->iterator, JS_UNDEFINED, TRUE, gen->ctx)) {
if(asynciterator_stop(&gen->iterator, JS_UNDEFINED, gen->ctx)) {
gen->closing = FALSE;
gen->closed = TRUE;
}
}
}

#ifdef DEBUG_OUTPUT
printf("%-22s gen: %p reads: %zu updated: %zu read: %i\n", __func__, gen, list_size(&gen->iterator.reads), rds1 - list_size(&gen->iterator.reads), id);
printf("%-22s gen: %p chunk_size: %zu reads: %zu ret: '%s' buffering: %i closing: %i closed: %i r/w: %zu/%zu queue: %zu/%zub\n",
__func__,
(uint32_t)gen,
gen->chunk_size,
list_size(&gen->iterator.reads),
JS_ToCString(gen->ctx, ret),
gen->buffering,
gen->closing,
gen->closed,
gen->bytes_written,
gen->bytes_read,
gen->q ? queue_size(gen->q) : 0,
gen->q ? queue_bytes(gen->q) : 0);
#endif

return ret;
}

Expand All @@ -278,42 +308,60 @@ generator_write(Generator* gen, const void* data, size_t len, JSValueConst callb
ByteBlock blk = block_copy(data, len);
ssize_t ret = -1, size = block_SIZE(&blk);

#ifdef DEBUG_OUTPUT
printf("%-22s gen: %p reads: %zu\n", __func__, gen, list_size(&gen->iterator.reads));
#endif
if(!gen->buffering && !(gen->q && gen->q->continuous) && (!gen->q || !queue_size(gen->q)) && asynciterator_pending(&gen->iterator)) {

if(gen->buffering || list_empty(&gen->iterator.reads) || (gen->q && gen->q->continuous)) {
ret = enqueue_block(gen, blk, callback);
JSValue chunk = gen->block_fn(&blk, gen->ctx);

if(gen->buffering) {
gen->bytes_read += block_SIZE(&blk);
gen->chunks_read += 1;

while(!list_empty(&gen->iterator.reads) && gen->q && !queue_empty(gen->q) && block_SIZE(&queue_front(gen->q)->block) >= gen->chunk_size) {
BOOL done = FALSE, binary = FALSE;
ByteBlock blk = queue_next(gen->q, &done, &binary);
#ifdef DEBUG_OUTPUT
printf("%s block_SIZE(&blk) = %zu\n", __func__, block_SIZE(&blk));
#endif
JSValue chunk = gen->block_fn(&blk, gen->ctx);
if(asynciterator_yield(&gen->iterator, chunk, gen->ctx))
ret = size;

asynciterator_yield(&gen->iterator, chunk, gen->ctx);
JS_FreeValue(gen->ctx, chunk);
}
}
JS_FreeValue(gen->ctx, chunk);
} else {
ret = enqueue_block(gen, blk, callback);

JSValue chunk = gen->block_fn(&blk, gen->ctx);
// gen->buffering = !queue_empty(gen->q);

if(asynciterator_yield(&gen->iterator, chunk, gen->ctx))
ret = size;
while(asynciterator_pending(&gen->iterator) && gen->q && !queue_empty(gen->q) && block_SIZE(&queue_front(gen->q)->block) >= gen->chunk_size) {
BOOL done = FALSE, binary = FALSE;

JS_FreeValue(gen->ctx, chunk);
#ifdef DEBUG_OUTPUT
printf("%s block_SIZE(&blk) = %zu\n", __func__, block_SIZE(&blk));
#endif

JSValue chunk = dequeue_value(gen, &done, &binary);
asynciterator_emplace(&gen->iterator, chunk, done, gen->ctx);

JS_FreeValue(gen->ctx, chunk);
}
}

if(ret >= 0) {
gen->bytes_written += ret;
gen->chunks_written += 1;
}

#ifdef DEBUG_OUTPUT
printf("%-22s gen: %p chunk_size: %zu data: '%.*s' len: %zu chunk_size: %zu reads: %zu continuous: %i buffering: %i closing: %i closed: %i r/w: %zu/%zu queue: %zu/%zub\n",
__func__,
(uint32_t)gen,
gen->chunk_size,
MIN(10, (int)len),
data,
len,
gen->chunk_size,
list_size(&gen->iterator.reads),
(gen->q && gen->q->continuous),
gen->buffering,
gen->closing,
gen->closed,
gen->bytes_read,
gen->bytes_written,
gen->q ? queue_size(gen->q) : 0,
gen->q ? queue_bytes(gen->q) : 0);
#endif
return ret;
}

Expand All @@ -331,13 +379,25 @@ generator_push(Generator* gen, JSValueConst value) {

ret = js_async_create(gen->ctx, &gen->resolve_reject);

#ifdef DEBUG_OUTPUT
printf("%-22s reads: %zu value: %.*s closing: %i closed: %i\n", __func__, list_size(&gen->iterator.reads), 10, JS_ToCString(gen->ctx, value), gen->closing, gen->closed);
#endif

if(!generator_yield(gen, value, JS_UNDEFINED))
js_async_reject(gen->ctx, &gen->resolve_reject, JS_UNDEFINED);

#ifdef DEBUG_OUTPUT
printf("%-22s gen: %p chunk_size: %zu reads: %zu value: '%s' buffering: %i closing: %i closed: %i r/w: %zu/%zu queue: %zu/%zub\n",
__func__,
(uint32_t)gen,
gen->chunk_size,
list_size(&gen->iterator.reads),
JS_ToCString(gen->ctx, value),
gen->buffering,
gen->closing,
gen->closed,
gen->bytes_read,
gen->bytes_written,
gen->q ? queue_size(gen->q) : 0,
gen->q ? queue_bytes(gen->q) : 0);
#endif

gen->promise = js_promise_wrap(gen->ctx, ret);
JS_FreeValue(gen->ctx, ret);
return JS_DupValue(gen->ctx, gen->promise);
Expand Down Expand Up @@ -428,13 +488,20 @@ generator_stop(Generator* gen, JSValueConst arg) {
Queue* q;
QueueItem* item = 0;

if(gen->closed)
return ret;

#ifdef DEBUG_OUTPUT
printf("generator_stop(%s)\n", JS_ToCString(gen->ctx, arg));
printf("%-22s gen: %p chunk_size: %zu arg: '%s' closed: %zu queue: %zu/%zub\n",
__func__,
(uint32_t)gen,
gen->chunk_size,
JS_ToCString(gen->ctx, arg),
gen->closed,
gen->q ? queue_size(gen->q) : 0,
gen->q ? queue_bytes(gen->q) : 0);
#endif

if(gen->closed)
return ret;

if((q = gen->q)) {
if(!queue_complete(q)) {
item = queue_close(q);
Expand Down Expand Up @@ -524,8 +591,7 @@ generator_finish(Generator* gen) {
return TRUE;
}

if(gen->buffering)
generator_update(gen);
generator_update(gen);

return FALSE;
}
Expand Down
28 changes: 22 additions & 6 deletions src/minnet-client-http.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ http_client_callback(struct lws* wsi, enum lws_callback_reasons reason, void* us

size_t n = headers_write(&req->headers, wsi, &buf.write, buf.end);

DEBUG("APPEND_HANDSHAKE_HEADER %zu %zd '%.*s'\n", n, buffer_HEAD(&buf), (int)n, buf.read);
#ifdef DEBUT_OUTPUT
printf("APPEND_HANDSHAKE_HEADER %zu %zd '%.*s'\n", n, buffer_HEAD(&buf), (int)n, buf.read);
#endif

*(uint8_t**)in += n;

if(method_number(client->connect_info.method) == METHOD_POST && !lws_http_is_redirected_to_get(wsi)) {
Expand Down Expand Up @@ -311,7 +314,9 @@ http_client_callback(struct lws* wsi, enum lws_callback_reasons reason, void* us
while(!done) {
value = js_iterator_next(ctx, client->body, &client->next, &done, 0, 0);

DEBUG("js_iterator_next() = %s %i done=%i\n", JS_ToCString(ctx, value), JS_VALUE_GET_TAG(value), done);
#ifdef DEBUT_OUTPUT
printf("js_iterator_next() = %s %i done=%i\n", JS_ToCString(ctx, value), JS_VALUE_GET_TAG(value), done);
#endif

if(JS_IsException(value)) {
JSValue exception = JS_GetException(ctx);
Expand All @@ -321,9 +326,15 @@ http_client_callback(struct lws* wsi, enum lws_callback_reasons reason, void* us
JSBuffer input = js_buffer_new(ctx, value);
// js_std_dump_error(ctx);

DEBUG("\x1b[2K\ryielded %p %zu\n", input.data, input.size);
#ifdef DEBUT_OUTPUT
printf("\x1b[2K\ryielded %p %zu\n", input.data, input.size);
#endif

buffer_append(&buf, input.data, input.size);
DEBUG("\x1b[2K\rbuffered %zu/%zu bytes\n", buffer_REMAIN(&buf), buffer_HEAD(&buf));
#ifdef DEBUT_OUTPUT
printf("\x1b[2K\rbuffered %zu/%zu bytes\n", buffer_REMAIN(&buf), buffer_HEAD(&buf));
#endif

js_buffer_free(&input, JS_GetRuntime(ctx));
}

Expand All @@ -334,7 +345,10 @@ http_client_callback(struct lws* wsi, enum lws_callback_reasons reason, void* us
size = buf.write - buf.start;
if((r = lws_write(wsi, buf.start, size, (enum lws_write_protocol)n)) != size)
return 1;
DEBUG("\x1b[2K\rwrote %zd%s\n", r, n == LWS_WRITE_HTTP_FINAL ? " (final)" : "");
#ifdef DEBUT_OUTPUT
printf("\x1b[2K\rwrote %zd%s\n", r, n == LWS_WRITE_HTTP_FINAL ? " (final)" : "");
#endif

if(n != LWS_WRITE_HTTP_FINAL)
lws_callback_on_writable(wsi);
}
Expand Down Expand Up @@ -375,7 +389,9 @@ http_client_callback(struct lws* wsi, enum lws_callback_reasons reason, void* us
if(!JS_IsObject(session->resp_obj))
session->resp_obj = minnet_response_wrap(ctx, opaque->resp);

DEBUG("LWS_CALLBACK_RECEIVE_CLIENT_HTTP_READ len=%zu in='%.*s'", len, /*len > 30 ? 30 :*/ (int)len, (char*)in);
#ifdef DEBUT_OUTPUT
printf("LWS_CALLBACK_RECEIVE_CLIENT_HTTP_READ len=%zu in='%.*s'", len, /*len > 30 ? 30 :*/ (int)len, (char*)in);
#endif

generator_write(resp->body, in, len, JS_UNDEFINED);

Expand Down
Loading

0 comments on commit 3932c52

Please sign in to comment.