Browse Source

sockets into c eventloop example

pull/2/head
Sami Vaarala 11 years ago
parent
commit
e8edf13d04
  1. 214
      examples/eventloop/c_eventloop.c
  2. 91
      examples/eventloop/c_eventloop.js

214
examples/eventloop/c_eventloop.c

@ -15,16 +15,14 @@
#include "duktape.h"
/*
* Timers
*/
#define MAX_TIMERS 4096 /* this is quite excessive for embedded use, but good for testing */
#define MIN_DELAY 1.0
#define MIN_WAIT 1.0
#define MAX_WAIT 60000.0
#define MAX_EXPIRYS 10
#define MAX_FDS 256
typedef struct {
int64_t id; /* numeric ID (returned from e.g. setTimeout); zero if unused */
double target; /* next target time */
@ -48,6 +46,12 @@ static ev_timer timer_list[MAX_TIMERS];
static ev_timer timer_expiring;
static int timer_count; /* last timer at timer_count - 1 */
static int64_t timer_next_id = 1;
/* Socket poll state. */
static struct pollfd poll_list[MAX_FDS];
static int poll_count = 0;
/* Misc */
static int exit_requested = 0;
/* Get Javascript compatible 'now' timestamp (millisecs since 1970). */
@ -210,17 +214,76 @@ static void expire_timers(duk_context *ctx) {
duk_pop_2(ctx); /* -> [ ... ] */
}
static void compact_poll_list(void) {
int i, j, n;
/* i = input index
* j = output index (initially same as i)
*/
n = poll_count;
for (i = 0, j = 0; i < n; i++) {
struct pollfd *pfd = poll_list + i;
if (pfd->fd == 0) {
/* keep output index the same */
#if 0
fprintf(stderr, "remove pollfd (index %d): fd=%d, events=%d, revents=%d\n",
i, pfd->fd, pfd->events, pfd->revents),
fflush(stderr);
#endif
continue;
}
#if 0
fprintf(stderr, "keep pollfd (index %d -> %d): fd=%d, events=%d, revents=%d\n",
i, j, pfd->fd, pfd->events, pfd->revents),
fflush(stderr);
#endif
if (i != j) {
/* copy only if indices have diverged */
memcpy((void *) (poll_list + j), (void *) (poll_list + i), sizeof(struct pollfd));
}
j++;
}
if (j < poll_count) {
/* zeroize unused entries for sanity */
memset((void *) (poll_list + j), 0, (poll_count - j) * sizeof(struct pollfd));
}
poll_count = j;
}
int eventloop_run(duk_context *ctx) {
ev_timer *t;
double now;
double diff;
int timeout;
struct pollfd dummy[1];
int rc;
int i, n;
int idx_eventloop;
int idx_fd_handler;
/* The Ecmascript poll handler is passed through EventLoop.fdPollHandler
* which c_eventloop.js sets before we come here.
*/
duk_push_global_object(ctx);
duk_get_prop_string(ctx, -1, "EventLoop");
duk_get_prop_string(ctx, -1, "fdPollHandler"); /* -> [ global EventLoop fdPollHandler ] */
idx_fd_handler = duk_get_top_index(ctx);
idx_eventloop = idx_fd_handler - 1;
for (;;) {
/*
* Expire timers.
*/
expire_timers(ctx);
/*
* If exit requested, bail out as fast as possible.
*/
if (exit_requested) {
#if 0
fprintf(stderr, "exit requested, exiting event loop\n");
@ -229,6 +292,17 @@ int eventloop_run(duk_context *ctx) {
break;
}
/*
* Compact poll list by removing pollfds with fd == 0.
*/
compact_poll_list();
/*
* Determine poll() timeout (as close to poll() as possible as
* the wait is relative).
*/
now = get_now();
t = find_nearest_timer();
if (t) {
@ -240,16 +314,30 @@ int eventloop_run(duk_context *ctx) {
}
timeout = (int) diff; /* clamping ensures that fits */
} else {
/* No timers, exit (for now) */
break;
if (poll_count == 0) {
#if 0
fprintf(stderr, "no timers and no sockets to poll, exiting\n");
fflush(stderr);
#endif
break;
}
timeout = (int) MAX_WAIT;
}
/*
* Poll for activity or timeout.
*/
#if 0
fprintf(stderr, "poll timeout %d ms\n", timeout);
fprintf(stderr, "going to poll, timeout %d ms, pollfd count %d\n", timeout, poll_count);
fflush(stderr);
#endif
rc = poll(dummy, 0, timeout);
rc = poll(poll_list, poll_count, timeout);
#if 0
fprintf(stderr, "poll rc: %d\n", rc);
fflush(stderr);
#endif
if (rc < 0) {
/* error */
} else if (rc == 0) {
@ -257,8 +345,52 @@ int eventloop_run(duk_context *ctx) {
} else {
/* 'rc' fds active */
}
/*
* Check socket activity, handle all sockets. Handling is offloaded to
* Ecmascript code (fd + revents).
*
* If FDs are removed from the poll list while we're processing callbacks,
* the entries are simply marked unused (fd set to 0) without actually
* removing them from the poll list. This ensures indices are not
* disturbed. The poll list is compacted before next poll().
*/
n = (rc == 0 ? 0 : poll_count); /* if timeout, no need to check pollfd */
for (i = 0; i < n; i++) {
struct pollfd *pfd = poll_list + i;
if (pfd->fd == 0) {
/* deleted, perhaps by previous callback */
continue;
}
if (pfd->revents) {
#if 0
fprintf(stderr, "fd %d has revents: %d\n", (int) pfd->fd, (int) pfd->revents);
fflush(stderr);
#endif
duk_dup(ctx, idx_fd_handler);
duk_dup(ctx, idx_eventloop);
duk_push_int(ctx, pfd->fd);
duk_push_int(ctx, pfd->revents);
rc = duk_pcall_method(ctx, 2 /*nargs*/, DUK_INVALID_INDEX);
if (rc) {
#if 0
fprintf(stderr, "fd callback failed for fd %d: %s\n", (int) pfd->fd, duk_to_string(ctx, -1));
fflush(stderr);
#endif
}
duk_pop(ctx);
pfd->revents = 0;
}
}
}
duk_pop_n(ctx, 3);
return 0;
}
@ -403,6 +535,55 @@ static int delete_timer(duk_context *ctx) {
return 1;
}
static int listen_fd(duk_context *ctx) {
int fd = duk_require_int(ctx, 0);
int events = duk_require_int(ctx, 1);
int i, n;
struct pollfd *pfd;
#if 0
fprintf(stderr, "listen_fd: fd=%d, events=%d\n", fd, events);
fflush(stderr);
#endif
/* events == 0 means stop listening to the FD */
n = poll_count;
for (i = 0; i < n; i++) {
pfd = poll_list + i;
if (pfd->fd == fd) {
#if 0
fprintf(stderr, "listen_fd: fd found at index %d\n", i);
fflush(stderr);
#endif
if (events == 0) {
/* mark to-be-deleted, cleaned up by next poll */
pfd->fd = 0;
} else {
pfd->events = events;
}
return 0;
}
}
/* not found, append to list */
#if 0
fprintf(stderr, "listen_fd: fd not found on list, add new entry\n");
fflush(stderr);
#endif
if (poll_count >= MAX_FDS) {
duk_error(ctx, DUK_ERR_ERROR, "out of fd slots");
}
pfd = poll_list + poll_count;
pfd->fd = fd;
pfd->events = events;
pfd->revents = 0;
poll_count++;
return 0;
}
static int request_exit(duk_context *ctx) {
(void) ctx;
exit_requested = 1;
@ -412,22 +593,23 @@ static int request_exit(duk_context *ctx) {
void eventloop_register(duk_context *ctx) {
memset((void *) timer_list, 0, MAX_TIMERS * sizeof(ev_timer));
memset((void *) &timer_expiring, 0, sizeof(ev_timer));
memset((void *) poll_list, 0, MAX_FDS * sizeof(struct pollfd));
duk_push_global_object(ctx);
duk_push_string(ctx, "EventLoop");
duk_push_object(ctx);
duk_push_string(ctx, "createTimer");
duk_push_c_function(ctx, create_timer, 3);
duk_put_prop(ctx, -3);
duk_put_prop_string(ctx, -2, "createTimer");
duk_push_string(ctx, "deleteTimer");
duk_push_c_function(ctx, delete_timer, 1);
duk_put_prop(ctx, -3);
duk_put_prop_string(ctx, -2, "deleteTimer");
duk_push_c_function(ctx, listen_fd, 2);
duk_put_prop_string(ctx, -2, "listenFd");
duk_push_string(ctx, "requestExit");
duk_push_c_function(ctx, request_exit, 1);
duk_put_prop(ctx, -3);
duk_push_c_function(ctx, request_exit, 0);
duk_put_prop_string(ctx, -2, "requestExit");
duk_put_prop(ctx, -3); /* set global 'eventloop' */

91
examples/eventloop/c_eventloop.js

@ -86,3 +86,94 @@ function clearInterval(timer_id) {
function requestEventLoopExit() {
EventLoop.requestExit();
}
/*
* Socket handling
*
* Ideally this would be implemented more in C than here for more speed
* and smaller footprint: C code would directly maintain the callback state
* and such.
*
* Also for more optimal I/O, the buffer churn caused by allocating and
* freeing a lot of buffer values could be eliminated by reusing buffers.
* Socket reads would then go into a pre-allocated buffer, for instance.
*/
EventLoop.socketListening = {};
EventLoop.socketReading = {};
EventLoop.socketConnecting = {};
EventLoop.fdPollHandler = function(fd, revents) {
var data;
var cb;
var rc;
var acc_res;
//print('activity on fd', fd, 'revents', revents);
if (revents & Poll.POLLIN) {
cb = this.socketReading[fd];
if (cb) {
data = Socket.read(fd); // no size control now
//print('READ', Duktape.enc('jsonx', data));
if (String(data) === '') { // FIXME
this.close(fd);
return;
}
cb(fd, data);
} else {
cb = this.socketListening[fd];
if (cb) {
acc_res = Socket.accept(fd);
//print('ACCEPT:', Duktape.enc('jsonx', acc_res));
cb(acc_res.fd, acc_res.addr, acc_res.port);
} else {
//print('UNKNOWN');
}
}
}
if (revents & Poll.POLLOUT) {
// Connected
cb = this.socketConnecting[fd];
if (cb) {
delete this.socketConnecting[fd];
cb(fd);
}
}
if ((revents & ~(Poll.POLLIN | Poll.POLLOUT)) !== 0) {
//print('unexpected revents, close fd');
this.close(fd);
}
}
EventLoop.server = function(address, port, cb_accepted) {
var fd = Socket.createServerSocket(address, port);
this.socketListening[fd] = cb_accepted;
this.listenFd(fd, Poll.POLLIN);
}
EventLoop.connect = function(address, port, cb_connected) {
var fd = Socket.connect(address, port);
this.socketConnecting[fd] = cb_connected;
this.listenFd(fd, Poll.POLLOUT);
}
EventLoop.close = function(fd) {
EventLoop.listenFd(fd, 0);
delete this.socketListening[fd];
delete this.socketReading[fd];
delete this.socketConnecting[fd];
Socket.close(fd);
}
EventLoop.setReader = function(fd, cb_read) {
this.socketReading[fd] = cb_read;
this.listenFd(fd, Poll.POLLIN);
}
EventLoop.write = function(fd, data) {
// This simple example doesn't have support for write blocking / draining
var rc = Socket.write(fd, Duktape.Buffer(data));
}

Loading…
Cancel
Save