Browse Source

unix: Implement PEP 475 to retry syscalls failing with EINTR.

https://www.python.org/dev/peps/pep-0475/

This implements something similar to PEP 475 on the unix port, and for the
VfsPosix class.

There are a few differences from the CPython implementation:
- Since we call mp_handle_pending() between any ENITR's, additional
  functions could be called if MICROPY_ENABLE_SCHEDULER is enabled, not
  just signal handlers.
- CPython only handles signal on the main thread, so other threads will
  raise InterruptedError instead of retrying.  On MicroPython,
  mp_handle_pending() will currently raise exceptions on any thread.

A new macro MP_HAL_RETRY_SYSCALL is introduced to reduce duplicated code
and ensure that all instances behave the same.  This will also allow other
ports that use POSIX-like system calls (and use, eg, VfsPosix) to provide
their own implementation if needed.
pull/5813/head
David Lechner 5 years ago
committed by Damien George
parent
commit
9418611c8a
  1. 17
      extmod/vfs_posix.c
  2. 58
      extmod/vfs_posix_file.c
  3. 12
      ports/unix/input.c
  4. 6
      ports/unix/main.c
  5. 12
      ports/unix/modos.c
  6. 14
      ports/unix/modtime.c
  7. 6
      ports/unix/moduselect.c
  8. 100
      ports/unix/modusocket.c
  9. 19
      ports/unix/mphalport.h
  10. 11
      ports/unix/unix_mphal.c

17
extmod/vfs_posix.c

@ -26,6 +26,7 @@
#include "py/runtime.h"
#include "py/mperrno.h"
#include "py/mphal.h"
#include "py/mpthread.h"
#include "extmod/vfs.h"
#include "extmod/vfs_posix.h"
@ -290,12 +291,8 @@ STATIC mp_obj_t vfs_posix_stat(mp_obj_t self_in, mp_obj_t path_in) {
mp_obj_vfs_posix_t *self = MP_OBJ_TO_PTR(self_in);
struct stat sb;
const char *path = vfs_posix_get_path_str(self, path_in);
MP_THREAD_GIL_EXIT();
int ret = stat(path, &sb);
MP_THREAD_GIL_ENTER();
if (ret != 0) {
mp_raise_OSError(errno);
}
int ret;
MP_HAL_RETRY_SYSCALL(ret, stat(path, &sb), mp_raise_OSError(err));
mp_obj_tuple_t *t = MP_OBJ_TO_PTR(mp_obj_new_tuple(10, NULL));
t->items[0] = MP_OBJ_NEW_SMALL_INT(sb.st_mode);
t->items[1] = MP_OBJ_NEW_SMALL_INT(sb.st_ino);
@ -335,12 +332,8 @@ STATIC mp_obj_t vfs_posix_statvfs(mp_obj_t self_in, mp_obj_t path_in) {
mp_obj_vfs_posix_t *self = MP_OBJ_TO_PTR(self_in);
STRUCT_STATVFS sb;
const char *path = vfs_posix_get_path_str(self, path_in);
MP_THREAD_GIL_EXIT();
int ret = STATVFS(path, &sb);
MP_THREAD_GIL_ENTER();
if (ret != 0) {
mp_raise_OSError(errno);
}
int ret;
MP_HAL_RETRY_SYSCALL(ret, STATVFS(path, &sb), mp_raise_OSError(err));
mp_obj_tuple_t *t = MP_OBJ_TO_PTR(mp_obj_new_tuple(10, NULL));
t->items[0] = MP_OBJ_NEW_SMALL_INT(sb.f_bsize);
t->items[1] = MP_OBJ_NEW_SMALL_INT(sb.f_frsize);

58
extmod/vfs_posix_file.c

@ -24,6 +24,7 @@
* THE SOFTWARE.
*/
#include "py/mphal.h"
#include "py/mpthread.h"
#include "py/runtime.h"
#include "py/stream.h"
@ -102,12 +103,8 @@ mp_obj_t mp_vfs_posix_file_open(const mp_obj_type_t *type, mp_obj_t file_in, mp_
}
const char *fname = mp_obj_str_get_str(fid);
MP_THREAD_GIL_EXIT();
int fd = open(fname, mode_x | mode_rw, 0644);
MP_THREAD_GIL_ENTER();
if (fd == -1) {
mp_raise_OSError(errno);
}
int fd;
MP_HAL_RETRY_SYSCALL(fd, open(fname, mode_x | mode_rw, 0644), mp_raise_OSError(err));
o->fd = fd;
return MP_OBJ_FROM_PTR(o);
}
@ -139,14 +136,12 @@ STATIC MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(vfs_posix_file___exit___obj, 4, 4, vf
STATIC mp_uint_t vfs_posix_file_read(mp_obj_t o_in, void *buf, mp_uint_t size, int *errcode) {
mp_obj_vfs_posix_file_t *o = MP_OBJ_TO_PTR(o_in);
check_fd_is_open(o);
MP_THREAD_GIL_EXIT();
mp_int_t r = read(o->fd, buf, size);
MP_THREAD_GIL_ENTER();
if (r == -1) {
*errcode = errno;
ssize_t r;
MP_HAL_RETRY_SYSCALL(r, read(o->fd, buf, size), {
*errcode = err;
return MP_STREAM_ERROR;
}
return r;
});
return (mp_uint_t)r;
}
STATIC mp_uint_t vfs_posix_file_write(mp_obj_t o_in, const void *buf, mp_uint_t size, int *errcode) {
@ -158,46 +153,33 @@ STATIC mp_uint_t vfs_posix_file_write(mp_obj_t o_in, const void *buf, mp_uint_t
return size;
}
#endif
MP_THREAD_GIL_EXIT();
mp_int_t r = write(o->fd, buf, size);
MP_THREAD_GIL_ENTER();
while (r == -1 && errno == EINTR) {
if (MP_STATE_VM(mp_pending_exception) != MP_OBJ_NULL) {
mp_obj_t obj = MP_STATE_VM(mp_pending_exception);
MP_STATE_VM(mp_pending_exception) = MP_OBJ_NULL;
nlr_raise(obj);
}
MP_THREAD_GIL_EXIT();
r = write(o->fd, buf, size);
MP_THREAD_GIL_ENTER();
}
if (r == -1) {
*errcode = errno;
ssize_t r;
MP_HAL_RETRY_SYSCALL(r, write(o->fd, buf, size), {
*errcode = err;
return MP_STREAM_ERROR;
}
return r;
});
return (mp_uint_t)r;
}
STATIC mp_uint_t vfs_posix_file_ioctl(mp_obj_t o_in, mp_uint_t request, uintptr_t arg, int *errcode) {
mp_obj_vfs_posix_file_t *o = MP_OBJ_TO_PTR(o_in);
check_fd_is_open(o);
switch (request) {
case MP_STREAM_FLUSH:
MP_THREAD_GIL_EXIT();
int ret = fsync(o->fd);
MP_THREAD_GIL_ENTER();
if (ret == -1) {
if (errno == EINVAL
case MP_STREAM_FLUSH: {
int ret;
MP_HAL_RETRY_SYSCALL(ret, fsync(o->fd), {
if (err == EINVAL
&& (o->fd == STDIN_FILENO || o->fd == STDOUT_FILENO || o->fd == STDERR_FILENO)) {
// fsync(stdin/stdout/stderr) may fail with EINVAL, but don't propagate that
// error out. Because data is not buffered by us, and stdin/out/err.flush()
// should just be a no-op.
return 0;
}
*errcode = errno;
*errcode = err;
return MP_STREAM_ERROR;
}
});
return 0;
}
case MP_STREAM_SEEK: {
struct mp_stream_seek_t *s = (struct mp_stream_seek_t *)arg;
MP_THREAD_GIL_EXIT();

12
ports/unix/input.c

@ -24,6 +24,7 @@
* THE SOFTWARE.
*/
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@ -74,6 +75,9 @@ void prompt_read_history(void) {
char c;
int sz = read(fd, &c, 1);
if (sz < 0) {
if (errno == EINTR) {
continue;
}
break;
}
if (sz == 0 || c == '\n') {
@ -107,10 +111,10 @@ void prompt_write_history(void) {
for (int i = MP_ARRAY_SIZE(MP_STATE_PORT(readline_hist)) - 1; i >= 0; i--) {
const char *line = MP_STATE_PORT(readline_hist)[i];
if (line != NULL) {
int res;
res = write(fd, line, strlen(line));
res = write(fd, "\n", 1);
(void)res;
while (write(fd, line, strlen(line)) == -1 && errno == EINTR) {
}
while (write(fd, "\n", 1) == -1 && errno == EINTR) {
}
}
}
close(fd);

6
ports/unix/main.c

@ -64,11 +64,9 @@ long heap_size = 1024 * 1024 * (sizeof(mp_uint_t) / 4);
STATIC void stderr_print_strn(void *env, const char *str, size_t len) {
(void)env;
MP_THREAD_GIL_EXIT();
ssize_t dummy = write(STDERR_FILENO, str, len);
MP_THREAD_GIL_ENTER();
ssize_t ret;
MP_HAL_RETRY_SYSCALL(ret, write(STDERR_FILENO, str, len), {});
mp_uos_dupterm_tx_strn(str, len);
(void)dummy;
}
const mp_print_t mp_stderr_print = {NULL, stderr_print_strn};

12
ports/unix/modos.c

@ -53,10 +53,8 @@ STATIC mp_obj_t mod_os_stat(mp_obj_t path_in) {
struct stat sb;
const char *path = mp_obj_str_get_str(path_in);
MP_THREAD_GIL_EXIT();
int res = stat(path, &sb);
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(res, errno);
int res;
MP_HAL_RETRY_SYSCALL(res, stat(path, &sb), mp_raise_OSError(err));
mp_obj_tuple_t *t = MP_OBJ_TO_PTR(mp_obj_new_tuple(10, NULL));
t->items[0] = MP_OBJ_NEW_SMALL_INT(sb.st_mode);
@ -95,10 +93,8 @@ STATIC mp_obj_t mod_os_statvfs(mp_obj_t path_in) {
STRUCT_STATVFS sb;
const char *path = mp_obj_str_get_str(path_in);
MP_THREAD_GIL_EXIT();
int res = STATVFS(path, &sb);
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(res, errno);
int res;
MP_HAL_RETRY_SYSCALL(res, STATVFS(path, &sb), mp_raise_OSError(err));
mp_obj_tuple_t *t = MP_OBJ_TO_PTR(mp_obj_new_tuple(10, NULL));
t->items[0] = MP_OBJ_NEW_SMALL_INT(sb.f_bsize);

14
ports/unix/modtime.c

@ -117,10 +117,16 @@ STATIC mp_obj_t mod_time_sleep(mp_obj_t arg) {
}
RAISE_ERRNO(res, errno);
#else
// TODO: Handle EINTR
MP_THREAD_GIL_EXIT();
sleep(mp_obj_get_int(arg));
MP_THREAD_GIL_ENTER();
int seconds = mp_obj_get_int(arg);
for (;;) {
MP_THREAD_GIL_EXIT();
seconds = sleep(seconds);
MP_THREAD_GIL_ENTER();
if (seconds == 0) {
break;
}
mp_handle_pending(true);
}
#endif
return mp_const_none;
}

6
ports/unix/moduselect.c

@ -188,10 +188,8 @@ STATIC int poll_poll_internal(size_t n_args, const mp_obj_t *args) {
self->flags = flags;
MP_THREAD_GIL_EXIT();
int n_ready = poll(self->entries, self->len, timeout);
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(n_ready, errno);
int n_ready;
MP_HAL_RETRY_SYSCALL(n_ready, poll(self->entries, self->len, timeout), mp_raise_OSError(err));
return n_ready;
}

100
ports/unix/modusocket.c

@ -94,11 +94,8 @@ STATIC void socket_print(const mp_print_t *print, mp_obj_t self_in, mp_print_kin
STATIC mp_uint_t socket_read(mp_obj_t o_in, void *buf, mp_uint_t size, int *errcode) {
mp_obj_socket_t *o = MP_OBJ_TO_PTR(o_in);
MP_THREAD_GIL_EXIT();
mp_int_t r = read(o->fd, buf, size);
MP_THREAD_GIL_ENTER();
if (r == -1) {
int err = errno;
ssize_t r;
MP_HAL_RETRY_SYSCALL(r, read(o->fd, buf, size), {
// On blocking socket, we get EAGAIN in case SO_RCVTIMEO/SO_SNDTIMEO
// timed out, and need to convert that to ETIMEDOUT.
if (err == EAGAIN && o->blocking) {
@ -107,17 +104,14 @@ STATIC mp_uint_t socket_read(mp_obj_t o_in, void *buf, mp_uint_t size, int *errc
*errcode = err;
return MP_STREAM_ERROR;
}
return r;
});
return (mp_uint_t)r;
}
STATIC mp_uint_t socket_write(mp_obj_t o_in, const void *buf, mp_uint_t size, int *errcode) {
mp_obj_socket_t *o = MP_OBJ_TO_PTR(o_in);
MP_THREAD_GIL_EXIT();
mp_int_t r = write(o->fd, buf, size);
MP_THREAD_GIL_ENTER();
if (r == -1) {
int err = errno;
ssize_t r;
MP_HAL_RETRY_SYSCALL(r, write(o->fd, buf, size), {
// On blocking socket, we get EAGAIN in case SO_RCVTIMEO/SO_SNDTIMEO
// timed out, and need to convert that to ETIMEDOUT.
if (err == EAGAIN && o->blocking) {
@ -126,8 +120,8 @@ STATIC mp_uint_t socket_write(mp_obj_t o_in, const void *buf, mp_uint_t size, in
*errcode = err;
return MP_STREAM_ERROR;
}
return r;
});
return (mp_uint_t)r;
}
STATIC mp_uint_t socket_ioctl(mp_obj_t o_in, mp_uint_t request, uintptr_t arg, int *errcode) {
@ -166,16 +160,29 @@ STATIC mp_obj_t socket_connect(mp_obj_t self_in, mp_obj_t addr_in) {
mp_obj_socket_t *self = MP_OBJ_TO_PTR(self_in);
mp_buffer_info_t bufinfo;
mp_get_buffer_raise(addr_in, &bufinfo, MP_BUFFER_READ);
MP_THREAD_GIL_EXIT();
int r = connect(self->fd, (const struct sockaddr *)bufinfo.buf, bufinfo.len);
MP_THREAD_GIL_ENTER();
int err = errno;
if (r == -1 && self->blocking && err == EINPROGRESS) {
// EINPROGRESS on a blocking socket means the operation timed out
err = MP_ETIMEDOUT;
// special case of PEP 475 to retry only if blocking so we can't use
// MP_HAL_RETRY_SYSCALL() here
for (;;) {
MP_THREAD_GIL_EXIT();
int r = connect(self->fd, (const struct sockaddr *)bufinfo.buf, bufinfo.len);
MP_THREAD_GIL_ENTER();
if (r == -1) {
int err = errno;
if (self->blocking) {
if (err == EINTR) {
mp_handle_pending(true);
continue;
}
// EINPROGRESS on a blocking socket means the operation timed out
if (err == EINPROGRESS) {
err = MP_ETIMEDOUT;
}
}
mp_raise_OSError(err);
}
return mp_const_none;
}
RAISE_ERRNO(r, err);
return mp_const_none;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_2(socket_connect_obj, socket_connect);
@ -207,15 +214,14 @@ STATIC mp_obj_t socket_accept(mp_obj_t self_in) {
//struct sockaddr_storage addr;
byte addr[32];
socklen_t addr_len = sizeof(addr);
MP_THREAD_GIL_EXIT();
int fd = accept(self->fd, (struct sockaddr *)&addr, &addr_len);
MP_THREAD_GIL_ENTER();
int err = errno;
if (fd == -1 && self->blocking && err == EAGAIN) {
int fd;
MP_HAL_RETRY_SYSCALL(fd, accept(self->fd, (struct sockaddr *)&addr, &addr_len), {
// EAGAIN on a blocking socket means the operation timed out
err = MP_ETIMEDOUT;
}
RAISE_ERRNO(fd, err);
if (self->blocking && err == EAGAIN) {
err = MP_ETIMEDOUT;
}
mp_raise_OSError(err);
});
mp_obj_tuple_t *t = MP_OBJ_TO_PTR(mp_obj_new_tuple(2, NULL));
t->items[0] = MP_OBJ_FROM_PTR(socket_new(fd));
@ -238,11 +244,8 @@ STATIC mp_obj_t socket_recv(size_t n_args, const mp_obj_t *args) {
}
byte *buf = m_new(byte, sz);
MP_THREAD_GIL_EXIT();
int out_sz = recv(self->fd, buf, sz, flags);
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(out_sz, errno);
ssize_t out_sz;
MP_HAL_RETRY_SYSCALL(out_sz, recv(self->fd, buf, sz, flags), mp_raise_OSError(err));
mp_obj_t ret = mp_obj_new_str_of_type(&mp_type_bytes, buf, out_sz);
m_del(char, buf, sz);
return ret;
@ -262,11 +265,9 @@ STATIC mp_obj_t socket_recvfrom(size_t n_args, const mp_obj_t *args) {
socklen_t addr_len = sizeof(addr);
byte *buf = m_new(byte, sz);
MP_THREAD_GIL_EXIT();
int out_sz = recvfrom(self->fd, buf, sz, flags, (struct sockaddr *)&addr, &addr_len);
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(out_sz, errno);
ssize_t out_sz;
MP_HAL_RETRY_SYSCALL(out_sz, recvfrom(self->fd, buf, sz, flags, (struct sockaddr *)&addr, &addr_len),
mp_raise_OSError(err));
mp_obj_t buf_o = mp_obj_new_str_of_type(&mp_type_bytes, buf, out_sz);
m_del(char, buf, sz);
@ -291,11 +292,9 @@ STATIC mp_obj_t socket_send(size_t n_args, const mp_obj_t *args) {
mp_buffer_info_t bufinfo;
mp_get_buffer_raise(args[1], &bufinfo, MP_BUFFER_READ);
MP_THREAD_GIL_EXIT();
int out_sz = send(self->fd, bufinfo.buf, bufinfo.len, flags);
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(out_sz, errno);
ssize_t out_sz;
MP_HAL_RETRY_SYSCALL(out_sz, send(self->fd, bufinfo.buf, bufinfo.len, flags),
mp_raise_OSError(err));
return MP_OBJ_NEW_SMALL_INT(out_sz);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(socket_send_obj, 2, 3, socket_send);
@ -313,12 +312,9 @@ STATIC mp_obj_t socket_sendto(size_t n_args, const mp_obj_t *args) {
mp_buffer_info_t bufinfo, addr_bi;
mp_get_buffer_raise(args[1], &bufinfo, MP_BUFFER_READ);
mp_get_buffer_raise(dst_addr, &addr_bi, MP_BUFFER_READ);
MP_THREAD_GIL_EXIT();
int out_sz = sendto(self->fd, bufinfo.buf, bufinfo.len, flags,
(struct sockaddr *)addr_bi.buf, addr_bi.len);
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(out_sz, errno);
ssize_t out_sz;
MP_HAL_RETRY_SYSCALL(out_sz, sendto(self->fd, bufinfo.buf, bufinfo.len, flags,
(struct sockaddr *)addr_bi.buf, addr_bi.len), mp_raise_OSError(err));
return MP_OBJ_NEW_SMALL_INT(out_sz);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(socket_sendto_obj, 3, 4, socket_sendto);

19
ports/unix/mphalport.h

@ -23,6 +23,7 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <errno.h>
#include <unistd.h>
#ifndef CHAR_CTRL_C
@ -73,6 +74,24 @@ static inline void mp_hal_delay_us(mp_uint_t us) {
}
#define mp_hal_ticks_cpu() 0
// This macro is used to implement PEP 475 to retry specified syscalls on EINTR
#define MP_HAL_RETRY_SYSCALL(ret, syscall, raise) { \
for (;;) { \
MP_THREAD_GIL_EXIT(); \
ret = syscall; \
MP_THREAD_GIL_ENTER(); \
if (ret == -1) { \
int err = errno; \
if (err == EINTR) { \
mp_handle_pending(true); \
continue; \
} \
raise; \
} \
break; \
} \
}
#define RAISE_ERRNO(err_flag, error_val) \
{ if (err_flag == -1) \
{ mp_raise_OSError(error_val); } }

11
ports/unix/unix_mphal.c

@ -165,10 +165,9 @@ int mp_hal_stdin_rx_chr(void) {
main_term:;
#endif
MP_THREAD_GIL_EXIT();
unsigned char c;
int ret = read(STDIN_FILENO, &c, 1);
MP_THREAD_GIL_ENTER();
ssize_t ret;
MP_HAL_RETRY_SYSCALL(ret, read(STDIN_FILENO, &c, 1), {});
if (ret == 0) {
c = 4; // EOF, ctrl-D
} else if (c == '\n') {
@ -178,11 +177,9 @@ main_term:;
}
void mp_hal_stdout_tx_strn(const char *str, size_t len) {
MP_THREAD_GIL_EXIT();
int ret = write(STDOUT_FILENO, str, len);
MP_THREAD_GIL_ENTER();
ssize_t ret;
MP_HAL_RETRY_SYSCALL(ret, write(STDOUT_FILENO, str, len), {});
mp_uos_dupterm_tx_strn(str, len);
(void)ret; // to suppress compiler warning
}
// cooked is same as uncooked because the terminal does some postprocessing

Loading…
Cancel
Save