Sideband_CE/sbapp/kivymd/utils/asynckivy.py

68 lines
1.5 KiB
Python
Raw Permalink Normal View History

2022-07-07 22:16:10 +02:00
"""
asynckivy
=========
Copyright (c) 2019 Nattōsai Mitō
GitHub -
https://github.com/gottadiveintopython
GitHub Gist -
https://gist.github.com/gottadiveintopython/5f4a775849f9277081c396de65dc57c1
"""
__all__ = ("start", "sleep", "event")
import types
from collections import namedtuple
from functools import partial
from kivy.clock import Clock
CallbackParameter = namedtuple("CallbackParameter", ("args", "kwargs"))
def start(coro):
def step(*args, **kwargs):
try:
coro.send(CallbackParameter(args, kwargs))(step)
except StopIteration:
pass
try:
coro.send(None)(step)
except StopIteration:
pass
@types.coroutine
def sleep(duration):
# The partial() here looks meaningless. But this is needed in order
# to avoid weak reference.
param = yield lambda step_coro: Clock.schedule_once(
partial(step_coro), duration
)
return param.args[0]
class event:
def __init__(self, ed, name):
self.bind_id = None
self.ed = ed
self.name = name
def bind(self, step_coro):
self.bind_id = bind_id = self.ed.fbind(self.name, self.callback)
assert bind_id > 0 # check if binding succeeded
self.step_coro = step_coro
def callback(self, *args, **kwargs):
self.parameter = CallbackParameter(args, kwargs)
ed = self.ed
ed.unbind_uid(self.name, self.bind_id)
self.step_coro()
def __await__(self):
yield self.bind
return self.parameter