| t | import asyncio | t | import asyncio |
| from typing import Any, Callable, Dict, List, Optional | | from typing import Any, Callable, Dict, List, Optional |
| | | |
| class Loop: | | class Loop: |
| _jobs: List[Dict[str, Any]] = [] | | _jobs: List[Dict[str, Any]] = [] |
| _runner_task: Optional[asyncio.Task] = None | | _runner_task: Optional[asyncio.Task] = None |
| _lock: Optional[asyncio.Lock] = None | | _lock: Optional[asyncio.Lock] = None |
| | | |
| def __call__(self, func: Callable[..., Any]): | | def __call__(self, func: Callable[..., Any]): |
| | | |
| async def wrapper(*args, **kwargs): | | async def wrapper(*args, **kwargs): |
| if Loop._lock is None: | | if Loop._lock is None: |
| Loop._lock = asyncio.Lock() | | Loop._lock = asyncio.Lock() |
| loop = asyncio.get_running_loop() | | loop = asyncio.get_running_loop() |
| fut = loop.create_future() | | fut = loop.create_future() |
| job = {'func': func, 'args': args, 'kwargs': kwargs, 'future | | job = {'func': func, 'args': args, 'kwargs': kwargs, 'future |
| ': fut, 'alive': True} | | ': fut, 'alive': True} |
| async with Loop._lock: | | async with Loop._lock: |
| Loop._jobs.append(job) | | Loop._jobs.append(job) |
| if Loop._runner_task is None: | | if Loop._runner_task is None: |
| Loop._runner_task = asyncio.create_task(Loop._runner | | Loop._runner_task = asyncio.create_task(Loop._runner |
| ()) | | ()) |
| await fut | | await fut |
| return None | | return None |
| return wrapper | | return wrapper |
| | | |
| @classmethod | | @classmethod |
| async def _runner(cls): | | async def _runner(cls): |
| if cls._lock is None: | | if cls._lock is None: |
| cls._lock = asyncio.Lock() | | cls._lock = asyncio.Lock() |
| dirty = False | | dirty = False |
| try: | | try: |
| while True: | | while True: |
| async with cls._lock: | | async with cls._lock: |
| if not cls._jobs: | | if not cls._jobs: |
| cls._runner_task = None | | cls._runner_task = None |
| return | | return |
| snapshot = list(cls._jobs) | | snapshot = list(cls._jobs) |
| for job in snapshot: | | for job in snapshot: |
| if not job.get('alive', True): | | if not job.get('alive', True): |
| continue | | continue |
| try: | | try: |
| res = await job['func'](*job['args'], **job['kwa | | res = await job['func'](*job['args'], **job['kwa |
| rgs']) | | rgs']) |
| except Exception as exc: | | except Exception as exc: |
| job['alive'] = False | | job['alive'] = False |
| dirty = True | | dirty = True |
| if not job['future'].done(): | | if not job['future'].done(): |
| job['future'].set_exception(exc) | | job['future'].set_exception(exc) |
| continue | | continue |
| if res is None: | | if res is None: |
| for other in snapshot: | | for other in snapshot: |
| if other.get('alive', True): | | if other.get('alive', True): |
| other['alive'] = False | | other['alive'] = False |
| if not other['future'].done(): | | if not other['future'].done(): |
| other['future'].set_result(None) | | other['future'].set_result(None) |
| dirty = True | | dirty = True |
| break | | break |
| if dirty: | | if dirty: |
| async with cls._lock: | | async with cls._lock: |
| cls._jobs = [j for j in cls._jobs if j.get('aliv | | cls._jobs = [j for j in cls._jobs if j.get('aliv |
| e', True)] | | e', True)] |
| if not cls._jobs: | | if not cls._jobs: |
| cls._runner_task = None | | cls._runner_task = None |
| return | | return |
| dirty = False | | dirty = False |
| async with cls._lock: | | async with cls._lock: |
| if not cls._jobs: | | if not cls._jobs: |
| cls._runner_task = None | | cls._runner_task = None |
| return | | return |
| finally: | | finally: |
| async with cls._lock: | | async with cls._lock: |
| for job in cls._jobs: | | for job in cls._jobs: |
| if not job['future'].done(): | | if not job['future'].done(): |
| job['future'].set_result(None) | | job['future'].set_result(None) |
| cls._jobs.clear() | | cls._jobs.clear() |
| cls._runner_task = None | | cls._runner_task = None |