From e8edf13d04a9672bcc27cb857b61f9053215a149 Mon Sep 17 00:00:00 2001 From: Sami Vaarala Date: Tue, 11 Mar 2014 02:06:00 +0200 Subject: [PATCH] sockets into c eventloop example --- examples/eventloop/c_eventloop.c | 214 +++++++++++++++++++++++++++--- examples/eventloop/c_eventloop.js | 91 +++++++++++++ 2 files changed, 289 insertions(+), 16 deletions(-) diff --git a/examples/eventloop/c_eventloop.c b/examples/eventloop/c_eventloop.c index 6fd56d93..c7461190 100644 --- a/examples/eventloop/c_eventloop.c +++ b/examples/eventloop/c_eventloop.c @@ -15,16 +15,14 @@ #include "duktape.h" -/* - * Timers - */ - #define MAX_TIMERS 4096 /* this is quite excessive for embedded use, but good for testing */ #define MIN_DELAY 1.0 #define MIN_WAIT 1.0 #define MAX_WAIT 60000.0 #define MAX_EXPIRYS 10 +#define MAX_FDS 256 + typedef struct { int64_t id; /* numeric ID (returned from e.g. setTimeout); zero if unused */ double target; /* next target time */ @@ -48,6 +46,12 @@ static ev_timer timer_list[MAX_TIMERS]; static ev_timer timer_expiring; static int timer_count; /* last timer at timer_count - 1 */ static int64_t timer_next_id = 1; + +/* Socket poll state. */ +static struct pollfd poll_list[MAX_FDS]; +static int poll_count = 0; + +/* Misc */ static int exit_requested = 0; /* Get Javascript compatible 'now' timestamp (millisecs since 1970). */ @@ -210,17 +214,76 @@ static void expire_timers(duk_context *ctx) { duk_pop_2(ctx); /* -> [ ... ] */ } +static void compact_poll_list(void) { + int i, j, n; + + /* i = input index + * j = output index (initially same as i) + */ + + n = poll_count; + for (i = 0, j = 0; i < n; i++) { + struct pollfd *pfd = poll_list + i; + if (pfd->fd == 0) { + /* keep output index the same */ +#if 0 + fprintf(stderr, "remove pollfd (index %d): fd=%d, events=%d, revents=%d\n", + i, pfd->fd, pfd->events, pfd->revents), + fflush(stderr); +#endif + + continue; + } +#if 0 + fprintf(stderr, "keep pollfd (index %d -> %d): fd=%d, events=%d, revents=%d\n", + i, j, pfd->fd, pfd->events, pfd->revents), + fflush(stderr); +#endif + if (i != j) { + /* copy only if indices have diverged */ + memcpy((void *) (poll_list + j), (void *) (poll_list + i), sizeof(struct pollfd)); + } + j++; + } + + if (j < poll_count) { + /* zeroize unused entries for sanity */ + memset((void *) (poll_list + j), 0, (poll_count - j) * sizeof(struct pollfd)); + } + + poll_count = j; +} + int eventloop_run(duk_context *ctx) { ev_timer *t; double now; double diff; int timeout; - struct pollfd dummy[1]; int rc; + int i, n; + int idx_eventloop; + int idx_fd_handler; + + /* The Ecmascript poll handler is passed through EventLoop.fdPollHandler + * which c_eventloop.js sets before we come here. + */ + duk_push_global_object(ctx); + duk_get_prop_string(ctx, -1, "EventLoop"); + duk_get_prop_string(ctx, -1, "fdPollHandler"); /* -> [ global EventLoop fdPollHandler ] */ + idx_fd_handler = duk_get_top_index(ctx); + idx_eventloop = idx_fd_handler - 1; for (;;) { + /* + * Expire timers. + */ + expire_timers(ctx); + /* + * If exit requested, bail out as fast as possible. + */ + if (exit_requested) { #if 0 fprintf(stderr, "exit requested, exiting event loop\n"); @@ -229,6 +292,17 @@ int eventloop_run(duk_context *ctx) { break; } + /* + * Compact poll list by removing pollfds with fd == 0. + */ + + compact_poll_list(); + + /* + * Determine poll() timeout (as close to poll() as possible as + * the wait is relative). + */ + now = get_now(); t = find_nearest_timer(); if (t) { @@ -240,16 +314,30 @@ int eventloop_run(duk_context *ctx) { } timeout = (int) diff; /* clamping ensures that fits */ } else { - /* No timers, exit (for now) */ - break; + if (poll_count == 0) { +#if 0 + fprintf(stderr, "no timers and no sockets to poll, exiting\n"); + fflush(stderr); +#endif + break; + } + timeout = (int) MAX_WAIT; } + /* + * Poll for activity or timeout. + */ + #if 0 - fprintf(stderr, "poll timeout %d ms\n", timeout); + fprintf(stderr, "going to poll, timeout %d ms, pollfd count %d\n", timeout, poll_count); fflush(stderr); #endif - rc = poll(dummy, 0, timeout); + rc = poll(poll_list, poll_count, timeout); +#if 0 + fprintf(stderr, "poll rc: %d\n", rc); + fflush(stderr); +#endif if (rc < 0) { /* error */ } else if (rc == 0) { @@ -257,8 +345,52 @@ int eventloop_run(duk_context *ctx) { } else { /* 'rc' fds active */ } + + /* + * Check socket activity, handle all sockets. Handling is offloaded to + * Ecmascript code (fd + revents). + * + * If FDs are removed from the poll list while we're processing callbacks, + * the entries are simply marked unused (fd set to 0) without actually + * removing them from the poll list. This ensures indices are not + * disturbed. The poll list is compacted before next poll(). + */ + + n = (rc == 0 ? 0 : poll_count); /* if timeout, no need to check pollfd */ + for (i = 0; i < n; i++) { + struct pollfd *pfd = poll_list + i; + + if (pfd->fd == 0) { + /* deleted, perhaps by previous callback */ + continue; + } + + if (pfd->revents) { +#if 0 + fprintf(stderr, "fd %d has revents: %d\n", (int) pfd->fd, (int) pfd->revents); + fflush(stderr); +#endif + duk_dup(ctx, idx_fd_handler); + duk_dup(ctx, idx_eventloop); + duk_push_int(ctx, pfd->fd); + duk_push_int(ctx, pfd->revents); + rc = duk_pcall_method(ctx, 2 /*nargs*/, DUK_INVALID_INDEX); + if (rc) { +#if 0 + fprintf(stderr, "fd callback failed for fd %d: %s\n", (int) pfd->fd, duk_to_string(ctx, -1)); + fflush(stderr); +#endif + } + duk_pop(ctx); + + pfd->revents = 0; + } + + } } + duk_pop_n(ctx, 3); + return 0; } @@ -403,6 +535,55 @@ static int delete_timer(duk_context *ctx) { return 1; } +static int listen_fd(duk_context *ctx) { + int fd = duk_require_int(ctx, 0); + int events = duk_require_int(ctx, 1); + int i, n; + struct pollfd *pfd; + +#if 0 + fprintf(stderr, "listen_fd: fd=%d, events=%d\n", fd, events); + fflush(stderr); +#endif + /* events == 0 means stop listening to the FD */ + + n = poll_count; + for (i = 0; i < n; i++) { + pfd = poll_list + i; + if (pfd->fd == fd) { +#if 0 + fprintf(stderr, "listen_fd: fd found at index %d\n", i); + fflush(stderr); +#endif + if (events == 0) { + /* mark to-be-deleted, cleaned up by next poll */ + pfd->fd = 0; + } else { + pfd->events = events; + } + return 0; + } + } + + /* not found, append to list */ +#if 0 + fprintf(stderr, "listen_fd: fd not found on list, add new entry\n"); + fflush(stderr); +#endif + + if (poll_count >= MAX_FDS) { + duk_error(ctx, DUK_ERR_ERROR, "out of fd slots"); + } + + pfd = poll_list + poll_count; + pfd->fd = fd; + pfd->events = events; + pfd->revents = 0; + poll_count++; + + return 0; +} + static int request_exit(duk_context *ctx) { (void) ctx; exit_requested = 1; @@ -412,22 +593,23 @@ static int request_exit(duk_context *ctx) { void eventloop_register(duk_context *ctx) { memset((void *) timer_list, 0, MAX_TIMERS * sizeof(ev_timer)); memset((void *) &timer_expiring, 0, sizeof(ev_timer)); + memset((void *) poll_list, 0, MAX_FDS * sizeof(struct pollfd)); duk_push_global_object(ctx); duk_push_string(ctx, "EventLoop"); duk_push_object(ctx); - duk_push_string(ctx, "createTimer"); duk_push_c_function(ctx, create_timer, 3); - duk_put_prop(ctx, -3); + duk_put_prop_string(ctx, -2, "createTimer"); - duk_push_string(ctx, "deleteTimer"); duk_push_c_function(ctx, delete_timer, 1); - duk_put_prop(ctx, -3); + duk_put_prop_string(ctx, -2, "deleteTimer"); + + duk_push_c_function(ctx, listen_fd, 2); + duk_put_prop_string(ctx, -2, "listenFd"); - duk_push_string(ctx, "requestExit"); - duk_push_c_function(ctx, request_exit, 1); - duk_put_prop(ctx, -3); + duk_push_c_function(ctx, request_exit, 0); + duk_put_prop_string(ctx, -2, "requestExit"); duk_put_prop(ctx, -3); /* set global 'eventloop' */ diff --git a/examples/eventloop/c_eventloop.js b/examples/eventloop/c_eventloop.js index 6180497c..fa743084 100644 --- a/examples/eventloop/c_eventloop.js +++ b/examples/eventloop/c_eventloop.js @@ -86,3 +86,94 @@ function clearInterval(timer_id) { function requestEventLoopExit() { EventLoop.requestExit(); } + +/* + * Socket handling + * + * Ideally this would be implemented more in C than here for more speed + * and smaller footprint: C code would directly maintain the callback state + * and such. + * + * Also for more optimal I/O, the buffer churn caused by allocating and + * freeing a lot of buffer values could be eliminated by reusing buffers. + * Socket reads would then go into a pre-allocated buffer, for instance. + */ + +EventLoop.socketListening = {}; +EventLoop.socketReading = {}; +EventLoop.socketConnecting = {}; + +EventLoop.fdPollHandler = function(fd, revents) { + var data; + var cb; + var rc; + var acc_res; + + //print('activity on fd', fd, 'revents', revents); + + if (revents & Poll.POLLIN) { + cb = this.socketReading[fd]; + if (cb) { + data = Socket.read(fd); // no size control now + //print('READ', Duktape.enc('jsonx', data)); + if (String(data) === '') { // FIXME + this.close(fd); + return; + } + cb(fd, data); + } else { + cb = this.socketListening[fd]; + if (cb) { + acc_res = Socket.accept(fd); + //print('ACCEPT:', Duktape.enc('jsonx', acc_res)); + cb(acc_res.fd, acc_res.addr, acc_res.port); + } else { + //print('UNKNOWN'); + } + } + } + + if (revents & Poll.POLLOUT) { + // Connected + cb = this.socketConnecting[fd]; + if (cb) { + delete this.socketConnecting[fd]; + cb(fd); + } + } + + if ((revents & ~(Poll.POLLIN | Poll.POLLOUT)) !== 0) { + //print('unexpected revents, close fd'); + this.close(fd); + } +} + +EventLoop.server = function(address, port, cb_accepted) { + var fd = Socket.createServerSocket(address, port); + this.socketListening[fd] = cb_accepted; + this.listenFd(fd, Poll.POLLIN); +} + +EventLoop.connect = function(address, port, cb_connected) { + var fd = Socket.connect(address, port); + this.socketConnecting[fd] = cb_connected; + this.listenFd(fd, Poll.POLLOUT); +} + +EventLoop.close = function(fd) { + EventLoop.listenFd(fd, 0); + delete this.socketListening[fd]; + delete this.socketReading[fd]; + delete this.socketConnecting[fd]; + Socket.close(fd); +} + +EventLoop.setReader = function(fd, cb_read) { + this.socketReading[fd] = cb_read; + this.listenFd(fd, Poll.POLLIN); +} + +EventLoop.write = function(fd, data) { + // This simple example doesn't have support for write blocking / draining + var rc = Socket.write(fd, Duktape.Buffer(data)); +}