From 3df65e9bae3e3c5298a2691e177335bb5efeb5ed Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sat, 1 Apr 2017 09:20:07 +0300 Subject: [PATCH] zephyr/modusocket: Implement recv() for UDP sockets. The foundation of recv() support is per-socket queue of incoming packets, implemented using Zephyr FIFO object. This patch implements just recv() for UDP, because TCP recv() requires much more fine-grained control of network fragments and handling other issues, like EOF condition, etc. --- zephyr/modusocket.c | 87 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/zephyr/modusocket.c b/zephyr/modusocket.c index ea084368ca..68a66aa263 100644 --- a/zephyr/modusocket.c +++ b/zephyr/modusocket.c @@ -29,13 +29,21 @@ #include "py/runtime.h" +#include #include #include #include +#if 0 // print debugging info +#define DEBUG_printf printf +#else // don't print debugging info +#define DEBUG_printf(...) (void)0 +#endif + typedef struct _socket_obj_t { mp_obj_base_t base; struct net_context *ctx; + struct k_fifo recv_q; } socket_obj_t; STATIC const mp_obj_type_t socket_type; @@ -62,6 +70,44 @@ STATIC void parse_inet_addr(socket_obj_t *socket, mp_obj_t addr_in, struct socka sockaddr_in->sin_port = htons(mp_obj_get_int(addr_items[1])); } +// Copy data from Zephyr net_buf chain into linear buffer. +// We don't use net_nbuf_read(), because it's weird (e.g., we'd like to +// free processed data fragment ASAP, while net_nbuf_read() holds onto +// the whole fragment chain to do its deeds, and that's minor comparing +// to the fact that it copies data byte by byte). +static char *net_buf_gather(struct net_buf *buf, char *to, unsigned max_len) { + struct net_buf *tmp = buf->frags; + unsigned header_len = net_nbuf_appdata(buf) - tmp->data; + net_buf_pull(tmp, header_len); + + while (tmp && max_len) { + unsigned len = tmp->len; + if (len > max_len) { + len = max_len; + } + memcpy(to, tmp->data, len); + to += len; + max_len -= len; + tmp = net_buf_frag_del(buf, tmp); + } + + return to; +} + +// Callback for incoming packets. +static void sock_received_cb(struct net_context *context, struct net_buf *net_buf, int status, void *user_data) { + socket_obj_t *socket = (socket_obj_t*)user_data; + DEBUG_printf("recv cb: context: %p, status: %d, buf: %p", context, status, net_buf); + if (net_buf) { + DEBUG_printf(" (sz=%d, l=%d), token: %p", net_buf->size, net_buf->len, net_nbuf_token(net_buf)); + } + DEBUG_printf("\n"); + + // net_buf->frags will be overwritten by fifo, so save it + net_nbuf_set_token(net_buf, net_buf->frags); + k_fifo_put(&socket->recv_q, net_buf); +} + // Methods STATIC void socket_print(const mp_print_t *print, mp_obj_t self_in, mp_print_kind_t kind) { @@ -79,6 +125,7 @@ STATIC mp_obj_t socket_make_new(const mp_obj_type_t *type, size_t n_args, size_t socket_obj_t *socket = m_new_obj_with_finaliser(socket_obj_t); socket->base.type = type; + k_fifo_init(&socket->recv_q); int family = AF_INET; int socktype = SOCK_STREAM; @@ -114,6 +161,8 @@ STATIC mp_obj_t socket_bind(mp_obj_t self_in, mp_obj_t addr_in) { parse_inet_addr(socket, addr_in, &sockaddr); RAISE_ERRNO(net_context_bind(socket->ctx, &sockaddr, sizeof(sockaddr))); + DEBUG_printf("Setting recv cb after bind\n"); + RAISE_ERRNO(net_context_recv(socket->ctx, sock_received_cb, K_NO_WAIT, socket)); return mp_const_none; } STATIC MP_DEFINE_CONST_FUN_OBJ_2(socket_bind_obj, socket_bind); @@ -126,6 +175,8 @@ STATIC mp_obj_t socket_connect(mp_obj_t self_in, mp_obj_t addr_in) { parse_inet_addr(socket, addr_in, &sockaddr); RAISE_ERRNO(net_context_connect(socket->ctx, &sockaddr, sizeof(sockaddr), NULL, K_FOREVER, NULL)); + DEBUG_printf("Setting recv cb after connect()\n"); + RAISE_ERRNO(net_context_recv(socket->ctx, sock_received_cb, K_NO_WAIT, socket)); return mp_const_none; } STATIC MP_DEFINE_CONST_FUN_OBJ_2(socket_connect_obj, socket_connect); @@ -149,6 +200,41 @@ STATIC mp_obj_t socket_send(mp_obj_t self_in, mp_obj_t buf_in) { } STATIC MP_DEFINE_CONST_FUN_OBJ_2(socket_send_obj, socket_send); +STATIC mp_obj_t socket_recv(mp_obj_t self_in, mp_obj_t len_in) { + socket_obj_t *socket = self_in; + socket_check_closed(socket); + + enum net_sock_type sock_type = net_context_get_type(socket->ctx); + mp_int_t max_len = mp_obj_get_int(len_in); + unsigned recv_len; + vstr_t vstr; + + if (sock_type == SOCK_DGRAM) { + + struct net_buf *net_buf = k_fifo_get(&socket->recv_q, K_FOREVER); + // Restore ->frags overwritten by fifo + net_buf->frags = net_nbuf_token(net_buf); + + recv_len = net_nbuf_appdatalen(net_buf); + DEBUG_printf("recv: net_buf=%p, appdatalen: %d\n", net_buf, recv_len); + + if (recv_len > max_len) { + recv_len = max_len; + } + + vstr_init_len(&vstr, recv_len); + net_buf_gather(net_buf, vstr.buf, recv_len); + net_nbuf_unref(net_buf); + + } else { + mp_not_implemented(""); + } + + mp_obj_t ret = mp_obj_new_str_from_vstr(&mp_type_bytes, &vstr); + return ret; +} +STATIC MP_DEFINE_CONST_FUN_OBJ_2(socket_recv_obj, socket_recv); + STATIC mp_obj_t socket_close(mp_obj_t self_in) { socket_obj_t *socket = self_in; if (socket->ctx != NULL) { @@ -165,6 +251,7 @@ STATIC const mp_map_elem_t socket_locals_dict_table[] = { { MP_OBJ_NEW_QSTR(MP_QSTR_bind), (mp_obj_t)&socket_bind_obj }, { MP_OBJ_NEW_QSTR(MP_QSTR_connect), (mp_obj_t)&socket_connect_obj }, { MP_OBJ_NEW_QSTR(MP_QSTR_send), (mp_obj_t)&socket_send_obj }, + { MP_OBJ_NEW_QSTR(MP_QSTR_recv), (mp_obj_t)&socket_recv_obj }, }; STATIC MP_DEFINE_CONST_DICT(socket_locals_dict, socket_locals_dict_table);