Browse Source
This commit adds a completely new implementation of the uasyncio module. The aim of this version (compared to the original one in micropython-lib) is to be more compatible with CPython's asyncio module, so that one can more easily write code that runs under both MicroPython and CPython (and reuse CPython asyncio libraries, follow CPython asyncio tutorials, etc). Async code is not easy to write and any knowledge users already have from CPython asyncio should transfer to uasyncio without effort, and vice versa. The implementation here attempts to provide good compatibility with CPython's asyncio while still being "micro" enough to run where MicroPython runs. This follows the general philosophy of MicroPython itself, to make it feel like Python. The main change is to use a Task object for each coroutine. This allows more flexibility to queue tasks in various places, eg the main run loop, tasks waiting on events, locks or other tasks. It no longer requires pre-allocating a fixed queue size for the main run loop. A pairing heap is used to queue Tasks. It's currently implemented in pure Python, separated into components with lazy importing for optional components. In the future parts of this implementation can be moved to C to improve speed and reduce memory usage. But the aim is to maintain a pure-Python version as a reference version.pull/5332/head
Damien George
5 years ago
7 changed files with 700 additions and 0 deletions
@ -0,0 +1,26 @@ |
|||
# MicroPython uasyncio module |
|||
# MIT license; Copyright (c) 2019 Damien P. George |
|||
|
|||
from .core import * |
|||
|
|||
__version__ = (3, 0, 0) |
|||
|
|||
_attrs = { |
|||
"wait_for": "funcs", |
|||
"gather": "funcs", |
|||
"Event": "event", |
|||
"Lock": "lock", |
|||
"open_connection": "stream", |
|||
"start_server": "stream", |
|||
} |
|||
|
|||
# Lazy loader, effectively does: |
|||
# global attr |
|||
# from .mod import attr |
|||
def __getattr__(attr): |
|||
mod = _attrs.get(attr, None) |
|||
if mod is None: |
|||
raise AttributeError(attr) |
|||
value = getattr(__import__(mod, None, None, True, 1), attr) |
|||
globals()[attr] = value |
|||
return value |
@ -0,0 +1,231 @@ |
|||
# MicroPython uasyncio module |
|||
# MIT license; Copyright (c) 2019 Damien P. George |
|||
|
|||
from time import ticks_ms as ticks, ticks_diff, ticks_add |
|||
import sys, select |
|||
|
|||
# Import TaskQueue and Task |
|||
from .task import TaskQueue, Task |
|||
|
|||
|
|||
################################################################################ |
|||
# Exceptions |
|||
|
|||
|
|||
class CancelledError(BaseException): |
|||
pass |
|||
|
|||
|
|||
class TimeoutError(Exception): |
|||
pass |
|||
|
|||
|
|||
################################################################################ |
|||
# Sleep functions |
|||
|
|||
# "Yield" once, then raise StopIteration |
|||
class SingletonGenerator: |
|||
def __init__(self): |
|||
self.state = None |
|||
self.exc = StopIteration() |
|||
|
|||
def __iter__(self): |
|||
return self |
|||
|
|||
def __next__(self): |
|||
if self.state is not None: |
|||
_task_queue.push_sorted(cur_task, self.state) |
|||
self.state = None |
|||
return None |
|||
else: |
|||
self.exc.__traceback__ = None |
|||
raise self.exc |
|||
|
|||
|
|||
# Pause task execution for the given time (integer in milliseconds, uPy extension) |
|||
# Use a SingletonGenerator to do it without allocating on the heap |
|||
def sleep_ms(t, sgen=SingletonGenerator()): |
|||
assert sgen.state is None |
|||
sgen.state = ticks_add(ticks(), t) |
|||
return sgen |
|||
|
|||
|
|||
# Pause task execution for the given time (in seconds) |
|||
def sleep(t): |
|||
return sleep_ms(int(t * 1000)) |
|||
|
|||
|
|||
################################################################################ |
|||
# Queue and poller for stream IO |
|||
|
|||
|
|||
class IOQueue: |
|||
def __init__(self): |
|||
self.poller = select.poll() |
|||
self.map = {} # maps id(stream) to [task_waiting_read, task_waiting_write, stream] |
|||
|
|||
def _enqueue(self, s, idx): |
|||
if id(s) not in self.map: |
|||
entry = [None, None, s] |
|||
entry[idx] = cur_task |
|||
self.map[id(s)] = entry |
|||
self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT) |
|||
else: |
|||
sm = self.map[id(s)] |
|||
assert sm[idx] is None |
|||
assert sm[1 - idx] is not None |
|||
sm[idx] = cur_task |
|||
self.poller.modify(s, select.POLLIN | select.POLLOUT) |
|||
# Link task to this IOQueue so it can be removed if needed |
|||
cur_task.data = self |
|||
|
|||
def _dequeue(self, s): |
|||
del self.map[id(s)] |
|||
self.poller.unregister(s) |
|||
|
|||
def queue_read(self, s): |
|||
self._enqueue(s, 0) |
|||
|
|||
def queue_write(self, s): |
|||
self._enqueue(s, 1) |
|||
|
|||
def remove(self, task): |
|||
while True: |
|||
del_s = None |
|||
for k in self.map: # Iterate without allocating on the heap |
|||
q0, q1, s = self.map[k] |
|||
if q0 is task or q1 is task: |
|||
del_s = s |
|||
break |
|||
if del_s is not None: |
|||
self._dequeue(s) |
|||
else: |
|||
break |
|||
|
|||
def wait_io_event(self, dt): |
|||
for s, ev in self.poller.ipoll(dt): |
|||
sm = self.map[id(s)] |
|||
# print('poll', s, sm, ev) |
|||
if ev & ~select.POLLOUT and sm[0] is not None: |
|||
# POLLIN or error |
|||
_task_queue.push_head(sm[0]) |
|||
sm[0] = None |
|||
if ev & ~select.POLLIN and sm[1] is not None: |
|||
# POLLOUT or error |
|||
_task_queue.push_head(sm[1]) |
|||
sm[1] = None |
|||
if sm[0] is None and sm[1] is None: |
|||
self._dequeue(s) |
|||
elif sm[0] is None: |
|||
self.poller.modify(s, select.POLLOUT) |
|||
else: |
|||
self.poller.modify(s, select.POLLIN) |
|||
|
|||
|
|||
################################################################################ |
|||
# Main run loop |
|||
|
|||
# TaskQueue of Task instances |
|||
_task_queue = TaskQueue() |
|||
|
|||
# Task queue and poller for stream IO |
|||
_io_queue = IOQueue() |
|||
|
|||
|
|||
# Ensure the awaitable is a task |
|||
def _promote_to_task(aw): |
|||
return aw if isinstance(aw, Task) else create_task(aw) |
|||
|
|||
|
|||
# Create and schedule a new task from a coroutine |
|||
def create_task(coro): |
|||
if not hasattr(coro, "send"): |
|||
raise TypeError("coroutine expected") |
|||
t = Task(coro, globals()) |
|||
_task_queue.push_head(t) |
|||
return t |
|||
|
|||
|
|||
# Keep scheduling tasks until there are none left to schedule |
|||
def run_until_complete(main_task=None): |
|||
global cur_task |
|||
excs_all = (CancelledError, Exception) # To prevent heap allocation in loop |
|||
excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop |
|||
while True: |
|||
# Wait until the head of _task_queue is ready to run |
|||
dt = 1 |
|||
while dt > 0: |
|||
dt = -1 |
|||
t = _task_queue.peek() |
|||
if t: |
|||
# A task waiting on _task_queue; "ph_key" is time to schedule task at |
|||
dt = max(0, ticks_diff(t.ph_key, ticks())) |
|||
elif not _io_queue.map: |
|||
# No tasks can be woken so finished running |
|||
return |
|||
# print('(poll {})'.format(dt), len(_io_queue.map)) |
|||
_io_queue.wait_io_event(dt) |
|||
|
|||
# Get next task to run and continue it |
|||
t = _task_queue.pop_head() |
|||
cur_task = t |
|||
try: |
|||
# Continue running the coroutine, it's responsible for rescheduling itself |
|||
exc = t.data |
|||
if not exc: |
|||
t.coro.send(None) |
|||
else: |
|||
t.data = None |
|||
t.coro.throw(exc) |
|||
except excs_all as er: |
|||
# Check the task is not on any event queue |
|||
assert t.data is None |
|||
# This task is done, check if it's the main task and then loop should stop |
|||
if t is main_task: |
|||
if isinstance(er, StopIteration): |
|||
return er.value |
|||
raise er |
|||
# Save return value of coro to pass up to caller |
|||
t.data = er |
|||
# Schedule any other tasks waiting on the completion of this task |
|||
waiting = False |
|||
if hasattr(t, "waiting"): |
|||
while t.waiting.peek(): |
|||
_task_queue.push_head(t.waiting.pop_head()) |
|||
waiting = True |
|||
t.waiting = None # Free waiting queue head |
|||
# Print out exception for detached tasks |
|||
if not waiting and not isinstance(er, excs_stop): |
|||
print("task raised exception:", t.coro) |
|||
sys.print_exception(er) |
|||
# Indicate task is done |
|||
t.coro = None |
|||
|
|||
|
|||
# Create a new task from a coroutine and run it until it finishes |
|||
def run(coro): |
|||
return run_until_complete(create_task(coro)) |
|||
|
|||
|
|||
################################################################################ |
|||
# Event loop wrapper |
|||
|
|||
|
|||
class Loop: |
|||
def create_task(self, coro): |
|||
return create_task(coro) |
|||
|
|||
def run_forever(self): |
|||
run_until_complete() |
|||
# TODO should keep running until .stop() is called, even if there're no tasks left |
|||
|
|||
def run_until_complete(self, aw): |
|||
return run_until_complete(_promote_to_task(aw)) |
|||
|
|||
def close(self): |
|||
pass |
|||
|
|||
|
|||
# The runq_len and waitq_len arguments are for legacy uasyncio compatibility |
|||
def get_event_loop(runq_len=0, waitq_len=0): |
|||
return Loop() |
@ -0,0 +1,31 @@ |
|||
# MicroPython uasyncio module |
|||
# MIT license; Copyright (c) 2019-2020 Damien P. George |
|||
|
|||
from . import core |
|||
|
|||
# Event class for primitive events that can be waited on, set, and cleared |
|||
class Event: |
|||
def __init__(self): |
|||
self.state = False # False=unset; True=set |
|||
self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event |
|||
|
|||
def is_set(self): |
|||
return self.state |
|||
|
|||
def set(self): |
|||
# Event becomes set, schedule any tasks waiting on it |
|||
while self.waiting.peek(): |
|||
core._task_queue.push_head(self.waiting.pop_head()) |
|||
self.state = True |
|||
|
|||
def clear(self): |
|||
self.state = False |
|||
|
|||
async def wait(self): |
|||
if not self.state: |
|||
# Event not set, put the calling task on the event's waiting queue |
|||
self.waiting.push_head(core.cur_task) |
|||
# Set calling task's data to the event's queue so it can be removed if needed |
|||
core.cur_task.data = self.waiting |
|||
yield |
|||
return True |
@ -0,0 +1,50 @@ |
|||
# MicroPython uasyncio module |
|||
# MIT license; Copyright (c) 2019-2020 Damien P. George |
|||
|
|||
from . import core |
|||
|
|||
|
|||
async def wait_for(aw, timeout): |
|||
aw = core._promote_to_task(aw) |
|||
if timeout is None: |
|||
return await aw |
|||
|
|||
def cancel(aw, timeout): |
|||
await core.sleep(timeout) |
|||
aw.cancel() |
|||
|
|||
cancel_task = core.create_task(cancel(aw, timeout)) |
|||
try: |
|||
ret = await aw |
|||
except core.CancelledError: |
|||
# Ignore CancelledError from aw, it's probably due to timeout |
|||
pass |
|||
finally: |
|||
# Cancel the "cancel" task if it's still active (optimisation instead of cancel_task.cancel()) |
|||
if cancel_task.coro is not None: |
|||
core._task_queue.remove(cancel_task) |
|||
if cancel_task.coro is None: |
|||
# Cancel task ran to completion, ie there was a timeout |
|||
raise core.TimeoutError |
|||
return ret |
|||
|
|||
|
|||
async def gather(*aws, return_exceptions=False): |
|||
ts = [core._promote_to_task(aw) for aw in aws] |
|||
for i in range(len(ts)): |
|||
try: |
|||
# TODO handle cancel of gather itself |
|||
# if ts[i].coro: |
|||
# iter(ts[i]).waiting.push_head(cur_task) |
|||
# try: |
|||
# yield |
|||
# except CancelledError as er: |
|||
# # cancel all waiting tasks |
|||
# raise er |
|||
ts[i] = await ts[i] |
|||
except Exception as er: |
|||
if return_exceptions: |
|||
ts[i] = er |
|||
else: |
|||
raise er |
|||
return ts |
@ -0,0 +1,53 @@ |
|||
# MicroPython uasyncio module |
|||
# MIT license; Copyright (c) 2019-2020 Damien P. George |
|||
|
|||
from . import core |
|||
|
|||
# Lock class for primitive mutex capability |
|||
class Lock: |
|||
def __init__(self): |
|||
# The state can take the following values: |
|||
# - 0: unlocked |
|||
# - 1: locked |
|||
# - <Task>: unlocked but this task has been scheduled to acquire the lock next |
|||
self.state = 0 |
|||
# Queue of Tasks waiting to acquire this Lock |
|||
self.waiting = core.TaskQueue() |
|||
|
|||
def locked(self): |
|||
return self.state == 1 |
|||
|
|||
def release(self): |
|||
if self.state != 1: |
|||
raise RuntimeError |
|||
if self.waiting.peek(): |
|||
# Task(s) waiting on lock, schedule next Task |
|||
self.state = self.waiting.pop_head() |
|||
core._task_queue.push_head(self.state) |
|||
else: |
|||
# No Task waiting so unlock |
|||
self.state = 0 |
|||
|
|||
async def acquire(self): |
|||
if self.state != 0: |
|||
# Lock unavailable, put the calling Task on the waiting queue |
|||
self.waiting.push_head(core.cur_task) |
|||
# Set calling task's data to the lock's queue so it can be removed if needed |
|||
core.cur_task.data = self.waiting |
|||
try: |
|||
yield |
|||
except core.CancelledError as er: |
|||
if self.state == core.cur_task: |
|||
# Cancelled while pending on resume, schedule next waiting Task |
|||
self.state = 1 |
|||
self.release() |
|||
raise er |
|||
# Lock available, set it as locked |
|||
self.state = 1 |
|||
return True |
|||
|
|||
async def __aenter__(self): |
|||
return await self.acquire() |
|||
|
|||
async def __aexit__(self, exc_type, exc, tb): |
|||
return self.release() |
@ -0,0 +1,141 @@ |
|||
# MicroPython uasyncio module |
|||
# MIT license; Copyright (c) 2019-2020 Damien P. George |
|||
|
|||
from . import core |
|||
|
|||
|
|||
class Stream: |
|||
def __init__(self, s, e={}): |
|||
self.s = s |
|||
self.e = e |
|||
self.out_buf = b"" |
|||
|
|||
def get_extra_info(self, v): |
|||
return self.e[v] |
|||
|
|||
async def __aenter__(self): |
|||
return self |
|||
|
|||
async def __aexit__(self, exc_type, exc, tb): |
|||
await self.close() |
|||
|
|||
def close(self): |
|||
pass |
|||
|
|||
async def wait_closed(self): |
|||
# TODO yield? |
|||
self.s.close() |
|||
|
|||
async def read(self, n): |
|||
yield core._io_queue.queue_read(self.s) |
|||
return self.s.read(n) |
|||
|
|||
async def readline(self): |
|||
l = b"" |
|||
while True: |
|||
yield core._io_queue.queue_read(self.s) |
|||
l2 = self.s.readline() # may do multiple reads but won't block |
|||
l += l2 |
|||
if not l2 or l[-1] == 10: # \n (check l in case l2 is str) |
|||
return l |
|||
|
|||
def write(self, buf): |
|||
self.out_buf += buf |
|||
|
|||
async def drain(self): |
|||
mv = memoryview(self.out_buf) |
|||
off = 0 |
|||
while off < len(mv): |
|||
yield core._io_queue.queue_write(self.s) |
|||
ret = self.s.write(mv[off:]) |
|||
if ret is not None: |
|||
off += ret |
|||
self.out_buf = b"" |
|||
|
|||
|
|||
# Create a TCP stream connection to a remote host |
|||
async def open_connection(host, port): |
|||
from uerrno import EINPROGRESS |
|||
import usocket as socket |
|||
|
|||
ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking! |
|||
s = socket.socket() |
|||
s.setblocking(False) |
|||
ss = Stream(s) |
|||
try: |
|||
s.connect(ai[-1]) |
|||
except OSError as er: |
|||
if er.args[0] != EINPROGRESS: |
|||
raise er |
|||
yield core._io_queue.queue_write(s) |
|||
return ss, ss |
|||
|
|||
|
|||
# Class representing a TCP stream server, can be closed and used in "async with" |
|||
class Server: |
|||
async def __aenter__(self): |
|||
return self |
|||
|
|||
async def __aexit__(self, exc_type, exc, tb): |
|||
self.close() |
|||
await self.wait_closed() |
|||
|
|||
def close(self): |
|||
self.task.cancel() |
|||
|
|||
async def wait_closed(self): |
|||
await self.task |
|||
|
|||
async def _serve(self, cb, host, port, backlog): |
|||
import usocket as socket |
|||
|
|||
ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking! |
|||
s = socket.socket() |
|||
s.setblocking(False) |
|||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
|||
s.bind(ai[-1]) |
|||
s.listen(backlog) |
|||
self.task = core.cur_task |
|||
# Accept incoming connections |
|||
while True: |
|||
try: |
|||
yield core._io_queue.queue_read(s) |
|||
except core.CancelledError: |
|||
# Shutdown server |
|||
s.close() |
|||
return |
|||
try: |
|||
s2, addr = s.accept() |
|||
except: |
|||
# Ignore a failed accept |
|||
continue |
|||
s2.setblocking(False) |
|||
s2s = Stream(s2, {"peername": addr}) |
|||
core.create_task(cb(s2s, s2s)) |
|||
|
|||
|
|||
# Helper function to start a TCP stream server, running as a new task |
|||
# TODO could use an accept-callback on socket read activity instead of creating a task |
|||
async def start_server(cb, host, port, backlog=5): |
|||
s = Server() |
|||
core.create_task(s._serve(cb, host, port, backlog)) |
|||
return s |
|||
|
|||
|
|||
################################################################################ |
|||
# Legacy uasyncio compatibility |
|||
|
|||
|
|||
async def stream_awrite(self, buf, off=0, sz=-1): |
|||
if off != 0 or sz != -1: |
|||
buf = memoryview(buf) |
|||
if sz == -1: |
|||
sz = len(buf) |
|||
buf = buf[off : off + sz] |
|||
self.write(buf) |
|||
await self.drain() |
|||
|
|||
|
|||
Stream.aclose = Stream.wait_closed |
|||
Stream.awrite = stream_awrite |
|||
Stream.awritestr = stream_awrite # TODO explicitly convert to bytes? |
@ -0,0 +1,168 @@ |
|||
# MicroPython uasyncio module |
|||
# MIT license; Copyright (c) 2019-2020 Damien P. George |
|||
|
|||
# This file contains the core TaskQueue based on a pairing heap, and the core Task class. |
|||
# They can optionally be replaced by C implementations. |
|||
|
|||
from . import core |
|||
|
|||
|
|||
# pairing-heap meld of 2 heaps; O(1) |
|||
def ph_meld(h1, h2): |
|||
if h1 is None: |
|||
return h2 |
|||
if h2 is None: |
|||
return h1 |
|||
lt = core.ticks_diff(h1.ph_key, h2.ph_key) < 0 |
|||
if lt: |
|||
if h1.ph_child is None: |
|||
h1.ph_child = h2 |
|||
else: |
|||
h1.ph_child_last.ph_next = h2 |
|||
h1.ph_child_last = h2 |
|||
h2.ph_next = None |
|||
h2.ph_rightmost_parent = h1 |
|||
return h1 |
|||
else: |
|||
h1.ph_next = h2.ph_child |
|||
h2.ph_child = h1 |
|||
if h1.ph_next is None: |
|||
h2.ph_child_last = h1 |
|||
h1.ph_rightmost_parent = h2 |
|||
return h2 |
|||
|
|||
|
|||
# pairing-heap pairing operation; amortised O(log N) |
|||
def ph_pairing(child): |
|||
heap = None |
|||
while child is not None: |
|||
n1 = child |
|||
child = child.ph_next |
|||
n1.ph_next = None |
|||
if child is not None: |
|||
n2 = child |
|||
child = child.ph_next |
|||
n2.ph_next = None |
|||
n1 = ph_meld(n1, n2) |
|||
heap = ph_meld(heap, n1) |
|||
return heap |
|||
|
|||
|
|||
# pairing-heap delete of a node; stable, amortised O(log N) |
|||
def ph_delete(heap, node): |
|||
if node is heap: |
|||
child = heap.ph_child |
|||
node.ph_child = None |
|||
return ph_pairing(child) |
|||
# Find parent of node |
|||
parent = node |
|||
while parent.ph_next is not None: |
|||
parent = parent.ph_next |
|||
parent = parent.ph_rightmost_parent |
|||
# Replace node with pairing of its children |
|||
if node is parent.ph_child and node.ph_child is None: |
|||
parent.ph_child = node.ph_next |
|||
node.ph_next = None |
|||
return heap |
|||
elif node is parent.ph_child: |
|||
child = node.ph_child |
|||
next = node.ph_next |
|||
node.ph_child = None |
|||
node.ph_next = None |
|||
node = ph_pairing(child) |
|||
parent.ph_child = node |
|||
else: |
|||
n = parent.ph_child |
|||
while node is not n.ph_next: |
|||
n = n.ph_next |
|||
child = node.ph_child |
|||
next = node.ph_next |
|||
node.ph_child = None |
|||
node.ph_next = None |
|||
node = ph_pairing(child) |
|||
if node is None: |
|||
node = n |
|||
else: |
|||
n.ph_next = node |
|||
node.ph_next = next |
|||
if next is None: |
|||
node.ph_rightmost_parent = parent |
|||
parent.ph_child_last = node |
|||
return heap |
|||
|
|||
|
|||
# TaskQueue class based on the above pairing-heap functions. |
|||
class TaskQueue: |
|||
def __init__(self): |
|||
self.heap = None |
|||
|
|||
def peek(self): |
|||
return self.heap |
|||
|
|||
def push_sorted(self, v, key): |
|||
v.data = None |
|||
v.ph_key = key |
|||
v.ph_child = None |
|||
v.ph_next = None |
|||
self.heap = ph_meld(v, self.heap) |
|||
|
|||
def push_head(self, v): |
|||
self.push_sorted(v, core.ticks()) |
|||
|
|||
def pop_head(self): |
|||
v = self.heap |
|||
self.heap = ph_pairing(self.heap.ph_child) |
|||
return v |
|||
|
|||
def remove(self, v): |
|||
self.heap = ph_delete(self.heap, v) |
|||
|
|||
|
|||
# Task class representing a coroutine, can be waited on and cancelled. |
|||
class Task: |
|||
def __init__(self, coro, globals=None): |
|||
self.coro = coro # Coroutine of this Task |
|||
self.data = None # General data for queue it is waiting on |
|||
self.ph_key = 0 # Pairing heap |
|||
self.ph_child = None # Paring heap |
|||
self.ph_child_last = None # Paring heap |
|||
self.ph_next = None # Paring heap |
|||
self.ph_rightmost_parent = None # Paring heap |
|||
|
|||
def __iter__(self): |
|||
if not hasattr(self, "waiting"): |
|||
# Lazily allocated head of linked list of Tasks waiting on completion of this task. |
|||
self.waiting = TaskQueue() |
|||
return self |
|||
|
|||
def __next__(self): |
|||
if not self.coro: |
|||
# Task finished, raise return value to caller so it can continue. |
|||
raise self.data |
|||
else: |
|||
# Put calling task on waiting queue. |
|||
self.waiting.push_head(core.cur_task) |
|||
# Set calling task's data to this task that it waits on, to double-link it. |
|||
core.cur_task.data = self |
|||
|
|||
def cancel(self): |
|||
# Check if task is already finished. |
|||
if self.coro is None: |
|||
return False |
|||
# Can't cancel self (not supported yet). |
|||
if self is core.cur_task: |
|||
raise RuntimeError("cannot cancel self") |
|||
# If Task waits on another task then forward the cancel to the one it's waiting on. |
|||
while isinstance(self.data, Task): |
|||
self = self.data |
|||
# Reschedule Task as a cancelled task. |
|||
if hasattr(self.data, "remove"): |
|||
# Not on the main running queue, remove the task from the queue it's on. |
|||
self.data.remove(self) |
|||
core._task_queue.push_head(self) |
|||
elif core.ticks_diff(self.ph_key, core.ticks()) > 0: |
|||
# On the main running queue but scheduled in the future, so bring it forward to now. |
|||
core._task_queue.remove(self) |
|||
core._task_queue.push_head(self) |
|||
self.data = core.CancelledError |
|||
return True |
Loading…
Reference in new issue