Асинхронные возможности Python
Асинхронность:
- Явная (параллелизм) — в языке нет
- Со скрытой активацией (обратные вызовы, callbacks) — всё зависит от mainloop
- Сопрограммная — вот!
Модель
Предполагается, что весь предлагаемый код вы запускаете и смотрите на результат; без этого понять намного сложнее ☺
Как работает yield from
На время yield from код генератора prim() логически не исполняется, можно считать, что на это время его замещает sec()
Ловля return из генератора с помощью yield from
1 from time import sleep 2 3 def adder(x, y): 4 yield f"add: {x=}, {y=} - 1" 5 yield f"add: {x=}, {y=} - 2" 6 yield f"add: {x=}, {y=} - 3" 7 return x*2+y 8 9 def infadd(n): 10 for i in range(100000): # вечный цикл 11 res = yield from adder(i, n) 12 print(f"add-{i}: {res}") 13 14 core = infadd(3) 15 for i in range(100): 16 sleep(1) 17 res = next(core) 18 print(f"({i:3}) {res}")
Здесь x*2+y приезжает в infadd()
Асинхронность как произвольное исполнение сегментов кода между yield-ами
- Понятие образующего цикла (main loop)
Передача управления в main loop с помощью явного yield
1 from random import random 2 from time import sleep 3 def sec(Name): 4 yield f"SEC-{Name} >" 5 yield f"SEC-{Name} <" 6 7 def prim(Name="Prim"): 8 for i in range(100000): # вечный цикл 9 yield from sec(Name) 10 yield f"{Name}-{i}" 11 12 cores = prim("One"), prim("Two") 13 for i in range(100): 14 sleep(1) 15 res = next(cores[random()>0.75]) 16 print(f"({i:3}) {res}")
Здесь части кода одного sec() выполняются в три раза реже частей другого sec() (потому что в образующем цикле вероятнось вызова одного 0.75)
Обмен данными с сегментами сопрограмм с помощью .send()
1 from time import sleep 2 3 def adder(n): 4 x = yield f"add: get x[{n}]> " 5 y = yield f"add: get y[{n}]> " 6 return x*2+y 7 8 def add4(): 9 res1 = yield from adder(1) 10 res2 = yield from adder(2) 11 return res1 + res2 12 13 cmd = add4() 14 req = next(cmd) 15 for i in range(4): 16 data = eval(input(req)) 17 req = cmd.send(data)
Здесь adder() запрашивает в образующем цикле значения, а add4() комбинирует результаты работы двух adder()-ов
Обратите внимание на то, каким образом в действительности передаётся возвращаемое значение генератора: как поле value исключения StopIteration (если вы не увидели этого, значит, вы не запускали код… ну, как хотите ☹)
Мы можем усложнить логику управления образующим циклом. В примере ниже видя int mainloop засылает в генератор небольшое случайное целое, а видя str — случайную букву
1 from time import sleep 2 from string import ascii_uppercase as alpha 3 import random 4 5 def gen(): 6 x = yield int 7 y = yield str 8 return x*y 9 10 def rep(): 11 res1 = yield from gen() 12 res2 = yield from gen() 13 return res1 + res2 14 15 def runner(cmd): 16 req = next(cmd) 17 try: 18 while True: 19 if req is str: 20 req = cmd.send(random.choice(alpha)) 21 elif req is int: 22 req = cmd.send(random.randint(1,9)) 23 else: 24 raise ValueError(req) 25 except StopIteration as E: 26 cmd.close() 27 return E.value 28 29 print(runner(rep()))
Возвращаемое значение перехватывается в обработчике исключения StopIteration
Пример, иллюстрирующий понятие асинхронности
Модифицируем предыдущий пример:
rep(n) будет комбинировать результат n вызовов gen()
runner() будет
получать на вход несколько разных rep()
формировать из них кольцевую очередь, в которой рядом со ссылкой на генератор будет храниться «запрос» — что передавать rep() в следующий раз, int или str
- обрабатывать эту очередь по кругу
перехватывать StopIteration, забирать возвращённое значение, и исключать соответствующий rep() из очереди
- По исчерпании очереди — возвращать список результатов
1 from time import sleep 2 from string import ascii_uppercase as alpha 3 import random 4 5 def gen(): 6 return (yield int) * (yield str) 7 8 def rep(n): 9 res = "" 10 for i in range(n): 11 res += yield from gen() 12 return res 13 14 15 def runner(*cmds): 16 queue = [(cmd, None) for cmd in reversed(cmds)] 17 res = [] 18 while queue: 19 cmd, req = queue.pop() 20 try: 21 if req is str: 22 req = cmd.send(random.choice(alpha)) 23 elif req is int: 24 req = cmd.send(random.randint(1,6)) 25 elif req is None: 26 req = next(cmd) 27 else: 28 raise ValueError(req) 29 except StopIteration as E: 30 res.append(E.value) 31 cmd.close() 32 else: 33 queue.insert(0, (cmd, req)) 34 return res 35 36 print(runner(rep(10), rep(3), rep(5)))
Первым закончился rep(3), потому что его выполнение включало 6 yield-ов (6 фрагментов до yield + один концевой до return), затем — rep(5), и затем — rep(10) (хотя его первый фрагмент был выполнен первым).
Ещё модели
Цикл событий: получает откуда-то «события», после чего некоторые сегменты кода (которые ждут этого события) можно «разбудить»
- Цикл обратных вызовов: примерно то же самое
- Цикл с future:
- future — это асинхронный код из двух сегментов (зашёл - выпал в mainloop - вернул результат) + поле готовности
- Один код спит на ней
- Другой код заполняет результат и выставляет готовность
- mainloop проверяет готовность, и если она есть — передаёт управление
- …
Синтаксис Async
async def ~== генератор
yield from ~== await
@types.coroutine — чтобы и yield и return (не доделали ещё?)
async def + yield — совсем другое, это именно то, чем кажется, специальные асинхронные генераторы, которые можно проходить async for (причём в конструкторах вида [… async for i in асинхронный-гененратор …] тоже)
Перепишем предыдущий пример на async (дополнительно будем использовать deque вместо списка, потому что это правильно для реализации очереди):
1 from time import sleep 2 from string import ascii_uppercase as alpha 3 import random 4 import types 5 from collections import deque 6 7 @types.coroutine 8 def gen(): 9 x = yield int 10 y = yield str 11 return x*y 12 13 async def rep(n): 14 res = "" 15 for i in range(n): 16 res += await gen() 17 return res 18 19 def runner(*cmds): 20 queue = deque((cmd, None) for cmd in reversed(cmds)) 21 res = [] 22 while queue: 23 cmd, req = queue.pop() 24 try: 25 if req is str: 26 req = cmd.send(random.choice(alpha)) 27 elif req is int: 28 req = cmd.send(random.randint(1,6)) 29 elif req is None: 30 req = cmd.send(None) 31 else: 32 raise ValueError(req) 33 except StopIteration as E: 34 res.append(E.value) 35 cmd.close() 36 else: 37 queue.appendleft((cmd, req)) 38 return res 39 40 print(runner(rep(10), rep(3), rep(5)))
наш gen() — не совсем стандартная корутина, потому что использует прямое управление образующим циклом с помощью yield
В готовом инструментарии это практически никогда не нужно: и образующий цикл, и инструменты управления им должны входить в такой инструментарий
Asyncio
- Самое сложное — это логика образующего цикла
Самое ненужное — это логика образующего цикла (достаточно знать, как он работает, а не что делает)
⇒
- Запрограммируем образующий цикл заранее, насуём туда инструментов
Упростим протокол управления до одного понятия — future
(asyncio specific) обмажем огромным количеством применений IRL
Основные понятия:
Mainloop — полностью под капотом, мы его не видим
Task:
1 import asyncio 2 from time import strftime 3 4 async def late(delay, msg): 5 await asyncio.sleep(delay) 6 print(msg) 7 8 async def main(): 9 print(f"> {strftime('%X')}") 10 await late(1, "One") 11 print(f"> {strftime('%X')}") 12 await late(2, "Two") 13 print(f"> {strftime('%X')}") 14 task3 = asyncio.create_task(late(3, "Three")) 15 task4 = asyncio.create_task(late(4, "Four")) 16 await(task3) 17 print(f"> {strftime('%X')}") 18 await(task4) 19 print(f"> {strftime('%X')}") 20 21 asyncio.run(main())
asyncio.run(main()) — запуск «приложения» main() в образующем цикле asyncio()
- «приложение» asyncio — корутина, который заполняет очередь mainloop-а и немножко командует им
Если просто написать await — корутина «просто запустится», в чём асинхроннотсь, непонятно (даже если она и выходиля в mainloop)
- В примере первая корутина спит секунду, а вторая — две
Если написать create_task(корутина), то её выполнение тут же стартует в mainloop-е
- В прмере ещё две корутины планируются одновременно, первая из них спит три секунды, а вторая — четыре, так что отрабатывает через секунду после первой
asyncio.sleep(тайм-аут) — это команда mainlop-у «верни мне управление после тайм-аута»
Gather — атомарная операция create_task() / await над несколькими корутинами
Future — awaitable-объект, у которого есть творец и адресат, оба имеют к нему доступ.
- Адресат делает результат = await фьюча, и уходит в mainloop.
- Творец, когда ему заблагорассудится, с помощью фьюча.set_result(что-то) объявляет mainloop-у, что будущее наступило с результатом что-то
- Тогда mainloop, наконец, активизирует эту фьючу и адресат просыпается
1 import asyncio 2 3 async def delayed(fut, sec, ret): 4 await asyncio.sleep(sec) 5 fut.set_result(ret) 6 7 async def now(fut, ret): 8 fut.set_result(ret) 9 10 async def main(): 11 fut1, fut2 = asyncio.Future(), asyncio.Future() 12 asyncio.create_task(delayed(fut2, 1, 'Done')) 13 asyncio.create_task(now(fut1, 'Start')) 14 print(await fut1) 15 print(await fut2) 16 17 asyncio.run(main())
- В действительности, фьюча — это просто генератор на два шага: будушее не наступило / yield / будущее наступило, в котором есть два дополнительных поля (объявление о том, что оно наступило и передаваемое значение)
В этом примере одна из корутин обрабатывает две фьючи
1 import asyncio 2 3 async def delayed(fut, sec, ret): 4 await asyncio.sleep(sec) 5 fut.set_result(ret) 6 7 async def now_then(fut, sec, futb, ret): 8 fut.set_result(ret) 9 await asyncio.sleep(sec) 10 futb.set_result(ret) 11 12 async def main(): 13 fut1, fut2, fut3 = (asyncio.Future() for i in range(3)) 14 asyncio.create_task(delayed(fut2, 1, 'Middle')) 15 asyncio.create_task(now_then(fut1, 2, fut3, 'Border')) 16 print(await fut1) 17 print(await fut2) 18 print(await fut3) 19 20 asyncio.run(main())
- …
И толстый-толстый слой шоколада!
- Параллелизм (внешний, следите за тредобезопасностью или не используйте треды)
- Очереди (всякие)
- Сеть (I/O, IPC и всё остальное), сигналы
- Потоки (над этим всем)
- Модификация образующего цикла
- Вброс/перехват исключений
- …
Д/З
Задач на EJudge нет.
TODO