Различия между версиями 7 и 8
Версия 7 от 2022-12-08 18:15:02
Размер: 20132
Редактор: FrBrGeorge
Комментарий:
Версия 8 от 2022-12-09 17:38:29
Размер: 20171
Редактор: FrBrGeorge
Комментарий:
Удаления помечены так. Добавления помечены так.
Строка 396: Строка 396:
 1. '''TODO'''  1. <<EJCMC(230, NotifyEvent, Оповещения)>>

Асинхронные возможности Python

Асинхронность:

  • Прямая (параллелизм) — в языке нет
  • С последовательной активацией (обратные вызовы функций, callbacks) — полностью моделируется имеющимся синтаксисом
  • Сопрограммная (внутри сопрограммы есть синхронные участки, выполнение их произвольно) — вот!

Модель

<!> Предполагается, что весь предлагаемый код вы запускаете и смотрите на результат; без этого понять намного сложнее, if even possible ☺

  • Как работает yield from (повторение)

       1 def subr():
       2     yield "One"
       3     yield "Two"
       4 
       5 def task():
       6     for i in range(3):
       7         yield from subr()
       8         yield f"{i} Pass"
       9 
      10 for res in task():
      11     print(res)
    
    • На время yield from код генератора task() логически не исполняется, можно считать, что на это время его замещает subr()

  • Ловля return из генератора с помощью yield from (два способа)

       1 def subr(n):
       2     yield f"One: {n}"
       3     yield f"Two: {n}"
       4     return f"Done: {n}"
       5 
       6 def task():
       7     for i in range(3):
       8         result = yield from subr(i)
       9         yield result
      10     return "*END*"
      11 
      12 core = task()
      13 try:
      14     while (res := next(core)):
      15         print(res)
      16 except StopIteration as E:
      17     print(E.value)
    
    • Оператор return в генераторе откладывает свой параметр в поле .value исключения StopIteration

    • А в конструкции yield from … это значение приезжает прямо так!

  • Передача параметра в генератор с помощью .send() (повторение):

       1 def task(initial):
       2     value = initial
       3     while True:
       4         value = yield f"<{value * 2}>"
       5 
       6 core = task(100500)
       7 print(f"Start: {next(core)}")
       8 for i in range(5):
       9     print(core.send(i + 1))
    
    • Особенность: самый первый .send() должен быть генератор.send(None) (или, что то же самое, next(генератор), потому что в синтаксисе нет способа передать какое-то значение в начало генератора, а не в yield.

      • initial — это параметр генератор-функции, он передаётся в момент создания генератора, а не при его проходе

      • Мы договорились считать этот первый next() запуском генератора.

  • Куда происходит .send() в случае yield from?

       1 def subr():
       2     x = yield "Wait for x"
       3     y = yield f"Wait for y ({x=})"
       4     return x * y
       5 
       6 def task():
       7     while True:
       8         value = yield from subr()
       9         _ = yield value
      10 
      11 core = task()
      12 print(next(core))
      13 for i in range(8):
      14     print(core.send(i))
    
    • Ничего неожиданного: .send() попадает в тот итератор, который сейчас yield-ит

    • Внимательно посмотрим, куда что send-илось…

Асинхронность как произвольное исполнение частей кода между yield-ами

  • Понятие синхронного фрагмента — непрерывно выполняемого кода между yield-ами (а также стартом и return-ом)

  • Понятие образующего цикла (main loop)

  • Тот же пример, но с двумя асинхронно выполняющимися задачами:

       1 def subr(n):
       2     x = yield f"({n}) Wait for x"
       3     y = yield f"({n}) Wait for y ({x=})"
       4     return x * y
       5 
       6 def task(n):
       7     while True:
       8         value = yield from subr(n)
       9         _ = yield f"[{n}]: {value}"
      10 
      11 cores = task(0), task(1)
      12 print(next(cores[0]), next(cores[1]), sep="\n")
      13 for i in range(20):
      14     print(cores[not i % 3].send(i))
    
    • Здесь из образующего цикла поступает поток целых чисел, subr() их попарно умножает, а две задачи складывают эти произведения

    • Очередной число попадает в subr() выбранной задачи, а выбор задач делает образующий цикл

    • Синхронные фрагменты из task[0] выполняются в два раза чаще синхронных фрагментов из task[1]

    • Можно попробовать разобраться, что с чем складывалось…
  • Более сложный пример: три конечных задачи с разным количеством синхронных фрагментов
       1 from random import randint
       2 
       3 def subr():
       4     x = yield
       5     y = yield
       6     return x * y
       7 
       8 def task(num):
       9     res = 0
      10     for i in range(num):
      11         res += yield from subr()
      12     return res
      13 
      14 def loop(*tasks):
      15     queue, result = list(tasks), []
      16     print("Start:", *queue, sep="\n\t")
      17     for task in tasks:
      18         next(task)
      19     while queue:
      20         task = queue.pop(0)
      21         try:
      22             task.send(randint(1, 9))
      23         except StopIteration as ret:
      24             result.append((task, ret.value))
      25         else:
      26             queue.append(task)
      27     return result
      28 
      29 print("Done:", *loop(task(10), task(3), task(5)), sep="\n\t")
    
    • Образующий цикл вынесен в отдельную функцию и стал сложнее. В нём генерируется непрерывный поток случайных целых и отдаётся поштучно на обработку очередному заданию. Если задание закончилось, запоминается его результат, а если нет — ставится в конец очереди.
    • Для реализации этой логики пришлось снова «вытащить» явную обработку StopIteration

    • Значения, возвращаемые yield, при этом не используются вообще: yield служит только для разметки синхронных фрагментов

  • Если ещё усложнить логику образующего цикла, мы сможем управлять его поведением с помощью возвращаемых yield значений:

       1 from random import randint, choice
       2 from string import ascii_uppercase
       3 from collections import deque
       4 
       5 def subr():
       6     return (yield int) * (yield str)
       7 
       8 def task(num):
       9     res = ""
      10     for i in range(num):
      11         res += yield from subr()
      12     return res
      13 
      14 def loop(*tasks):
      15     queue, result = deque((task, None) for task in tasks), []
      16     print("Start:", *queue, sep="\n\t")
      17     while queue:
      18         task, request = queue.popleft()
      19         if request is int:
      20             data = randint(1, 4)
      21         elif request is str:
      22             data = choice(ascii_uppercase)
      23         else:
      24             data = request
      25         try:
      26             request = task.send(data)
      27         except StopIteration as ret:
      28             result.append((task, ret.value))
      29             task.close()
      30         else:
      31             queue.append((task, request))
      32     return result
      33 
      34 print("Done:", *loop(task(10), task(3), task(5)), sep="\n\t")
    
    • subr() возвращает тип параметра, который она хотела бы получить в следующем yield

    • Этот тип хранится в очереди вместо с заданием, чей subr() запросил данный параметр

    • Образующий цикл генерирует параметр сообразно типу
    • А ещё мы храним очередь в очереди, а не в списке, не надо привыкать к плохому!

Можно и дальше усложнять, но и так уже непросто!

Ещё модели

  • Цикл событий: образующий цикл получает откуда-то «события», определяет, кто их должен обрабатывать и вызывает функции-обработчики с параметром обработчик(событие) (возможно, не функции, а генераторы облаботчик.send(событие), не слишком важно).

  • Цикл обратных вызовов (callback-ов): частный случай того же самого: каждый обработчик «регистрируется» — по заранее определённому протоколу указывает, в каких случаях его надо вызывать (это и есть событие), а образующий цикл при наступлении события вызывает все обработчики, которые на нём зарегистрировались (опять-таки, можно организовать в виде функций, а можно в виде генераторов)
  • Цикл с future: унификация управления образующим циклом

    • future — это генератор из двух синхронных сегментов

      1. Настройка и yield в образующий цикл

      2. return возвращаемого значения

    • Кроме того, в future есть поле готовности / результата

    • Алгоритм работы:
      1. Генератор-сервис заводит неготовую фьючу в данном образующем цикле
      2. Генератор-пользователь делает yield from фьюча

      3. Фьюча выпадает в образующий цикл
      4. В какой-то момент генератор-сервис выставляет в фьюче готовность / результат
      5. На этом основании образующий цикл возвращает управление фьюче (во второй сегмент)
      6. А та возвращает значение генератору-пользователю
    • Фактически это частный случай обратных вызовов
  • … более сложная логика (например, приоритизация событий) …

Синтаксис Async

  • async def + return ≈ генератор

  • awaityield from

  • @types.coroutine: низкоуровневая сопрограмма, которая может делать и return значение, и yield

  • async def + yield — это именно то, чем кажется: генераторы, про которые сразу известно, что они асинхронные:

    • Их можно проходить async for (причём в конструкторах вида [… async for i in асинхронный-гененратор …] тоже)

Перепишем предыдущий пример на async

  •    1 from random import randint, choice
       2 from string import ascii_uppercase
       3 from types import coroutine
       4 from collections import deque
       5 
       6 @coroutine
       7 def subr():
       8     return (yield int) * (yield str)
       9 
      10 async def task(num):
      11     res = ""
      12     for i in range(num):
      13         res += await subr()
      14     return res
      15 
      16 def loop(*tasks):
      17     queue, result = deque((task, None) for task in tasks), []
      18     print("Start:", *queue, sep="\n\t")
      19     while queue:
      20         task, request = queue.popleft()
      21         if request is int:
      22             data = randint(1, 4)
      23         elif request is str:
      24             data = choice(ascii_uppercase)
      25         else:
      26             data = request
      27         try:
      28             request = task.send(data)
      29         except StopIteration as ret:
      30             result.append((task, ret.value))
      31             task.close()
      32         else:
      33             queue.append((task, request))
      34     return result
      35 
      36 print("Done:", *loop(task(10), task(3), task(5)), sep="\n\t")
    
    • Наш subr() использует прямое управление образующим циклом с помощью yield

      • В готовом инструментарии это практически никогда не нужно: и образующий цикл, и инструменты управления им должны входить в такой инструментарий

Asyncio

Базоавя документация

  • Самое сложное — это логика образующего цикла
  • Самое ненужное — это логика образующего цикла (достаточно знать, как он работает, а не что делает)

  • Запрограммируем образующий цикл заранее, насуём туда инструментов
  • Упростим протокол управления до одного понятия — Future

  • Обмажем протокол верхним уровнем (задания, события, очереди и т. п.)
    • До такой степени, что ни одна из наших сопрограмм не делает yield (если это не асинхронный генератор)

  • (asyncio specific) обмажем огромным количеством применений IRL

Основные понятия:

  • Mainloop — образующий цикл. Полностью под капотом, мы его не видим.

  • Task — асинхронное задание

       1 import asyncio
       2 from time import strftime
       3 
       4 async def late(delay, msg):
       5     await asyncio.sleep(delay)
       6     print(msg)
       7 
       8 async def main():
       9     print(f"> {strftime('%X')}")
      10     await late(1, "One")
      11     print(f"> {strftime('%X')}")
      12     await late(2, "Two")
      13     print(f"> {strftime('%X')}")
      14 
      15     task3 = asyncio.create_task(late(3, "Three"))
      16     task4 = asyncio.create_task(late(4, "Four"))
      17     await(task3)
      18     print(f"> {strftime('%X')}")
      19     await(task4)
      20     print(f"> {strftime('%X')}")
      21 
      22 asyncio.run(main())
    
    • asyncio.run(main()) — запуск «приложения» main() в образующем цикле asyncio()

    • «приложение» asyncio — корутина, который заполняет очередь mainloop-а и немножко командует им
    • Если просто написать await — корутина «просто запустится», в чём асинхроннотсь, непонятно (даже если она и выходила в mainloop)

    • В примере первая корутина спит секунду, а вторая — после этого ещё две

    • Если написать create_task(корутина), корутина регистрируется в mainloop-е, а возвращется нечто вроде фьючи — задание

    • await(здадание) запускает его

    • В примере ещё две корутины планируются одновременно, первая из них спит три секунды, а вторая — четыре, так что отрабатывает через секунду после первой

    • asyncio.sleep(тайм-аут) — это команда mainlop-у «верни мне управление после тайм-аута»

      • Чуть ли не единственная команда mainlop-у на поверхности
  • Gather — атомарная операция create_task() / await над несколькими корутинами

       1 import asyncio
       2 
       3 async def late(delay, msg):
       4     await asyncio.sleep(delay)
       5     print(msg)
       6     return delay
       7 
       8 async def main():
       9     res = await asyncio.gather(
      10             late(3, "A"),
      11             late(1, "B"),
      12             late(2, "C"),
      13     )
      14     print(res)
      15 
      16 asyncio.run(main())
    
    • Тут всё понятно, запустились, повисели сколько сказано, завершились
    • <!> (Python 3.11) группы заданий —запуск в виде контекстного менеджера

         1 import asyncio
         2 
         3 async def late(delay, msg):
         4     await asyncio.sleep(delay)
         5     print(msg)
         6     return delay
         7 
         8 async def main():
         9     async with asyncio.TaskGroup() as tg:
        10         tg.create_task(late(3, "A"))
        11         tg.create_task(late(1, "B"))
        12         tg.create_task(late(2, "C"))
        13     print("Done")
        14 
        15 asyncio.run(main())
      
  • Синхронизация

    • Например события

         1 async def waiter(name, event):
         2     print(f'{name} waits for {event}…')
         3     await event.wait()
         4     print(f'…{name} got it!')
         5 
         6 async def eventer(wait, event):
         7     print(f"Emitting {event} in {wait} seconds")
         8     await asyncio.sleep(wait)
         9     print(f"Emitting {event}…")
        10     event.set()
        11 
        12 async def main():
        13     event = asyncio.Event()
        14     await asyncio.gather(
        15         waiter("One", event),
        16         waiter("Two", event),
        17         eventer(1, event))
        18 
        19 asyncio.run(main())
      
  • Очереди

    •    1 async def ham(queue, size):
         2     for i in range(size):
         3         await asyncio.sleep(1)
         4         res = await queue.get()
         5         print(f"\tGot {res}")
         6 
         7 async def spam(wait, queue):
         8     for i in range(6):
         9         await asyncio.sleep(wait)
        10         val = f"{wait}:{i}"
        11         await queue.put(val)
        12         print(f"Put {val}")
        13 
        14 async def main():
        15     queue = asyncio.Queue()
        16     await asyncio.gather(
        17         ham(queue, 12),
        18         spam(0.4, queue),
        19         spam(1.6, queue))
        20 
        21 asyncio.run(main())
      
    • Есть и приоритетные очереди

И толстый-толстый слой шоколада!

  • Параллелизм (внешний, следите за тредобезопасностью или не используйте треды)

  • Изменение логики работы mainloop (aka Policies)

  • Сеть (I/O, IPC и всё остальное), сигналы
  • Потоки (над этим всем)

  • Вброс/перехват исключений
  • Дикая туча модулей на основе asyncio

Д/З

  1. Попробовать прочитать всю документацию и прощёлкать всё, до чего дотянетесь.

  2. EJudge: FilterQueue 'Очередь с фильтром'

    Напишите класс FilterQueue со следующими свойствами:

    • Это потомок asyncio.Queue

    • В экземпляре класса атрибут очередь.window содержит первый элемент очереди, или None, если очередь пуста

    • С помощью операции фильтр in очередь можно определить, присутствуют ли в очереди такие элементы, что выражение фильтр(элемент) истинно

    • Метод .later() синхронно переставляет первый элемент очереди в её конец, или вызывает исключение asyncio.QueueEmpty, если очередь пуста

    • Метод .get() содержит необязательный параметр фильтр. Вызов очередь.get(фильтр) работает так:

      • Если в очереди нет элементов, на которых фильтр(элемент) истинно, работает как обычный .get().

      • Если в очереди есть элементы, на которых фильтр(элемент) истинно, переставляет первый элемент очереди в её конец до тех пор, пока фильтр(элемент) не истинно, а затем выполняет обычный .get().

    • Разрешается воспользоваться внутренним представлением Queue

    Input:

       1 async def putter(n, queue):
       2     for i in range(n):
       3         await queue.put(i)
       4 
       5 async def getter(n, queue, filter):
       6     for i in range(n):
       7         await asyncio.sleep(0.1)
       8         yield await queue.get(filter)
       9 
      10 async def main():
      11     queue = FilterQueue(10)
      12     asyncio.create_task(putter(20, queue))
      13     async for res in getter(20, queue, lambda n: n % 2):
      14         print(res)
      15 
      16 asyncio.run(main())
    
    Output:

    1
    3
    5
    7
    9
    11
    13
    15
    17
    4
    19
    12
    6
    16
    8
    14
    0
    10
    2
    18
  3. EJudge: NotifyEvent 'Оповещения'

    Написать класс NotifyEvent (унаследованный от asyncio.Event) со следующими свойствами

    • В методе оповещение.set(имя) присутствует строка-имя адресата уведомления, но по умолчанию это None

    • Перед каждым оповещение.set() (кроме самого первого) требуется вызов оповещение.clear()

    • Метод await оповещение.wait() возвращает имя адресата уведомления (но в остальном работает как event.wait())

    Написать также сопрограмму task(имя, оповещение) со следующими свойствами:

    • Если уведомление «своё» — адресат уведомления совпадает с именем, — выводится имя, количество принятых «своих» уведомлений и количество принятых «чужих» уведомлений

    • Если вместо имени await оповещение.wait() вернул None, работа завершается

    • Использовать внутреннюю реализацию asyncio.Event в этой задаче нельзя

    Input:

       1 async def sender(names, notify):
       2     for name in names:
       3         notify.set(name)
       4         await asyncio.sleep(0.1)
       5         notify.clear()
       6     notify.set()
       7 
       8 async def main():
       9     notify = NotifyEvent()
      10     tasks = {n: task(n, notify) for n in "12"}
      11     targets = "1", "2", "2", "2", "2", "1", "2", "1", "1"
      12     await asyncio.gather(*(list(tasks.values()) + [sender(targets, notify)]))
      13 
      14 asyncio.run(main())
    
    Output:

    1: 1 / 0
    2: 1 / 1
    2: 2 / 1
    2: 3 / 1
    2: 4 / 1
    1: 2 / 4
    2: 5 / 2
    1: 3 / 5
    1: 4 / 5

TODO Тесты

LecturesCMC/PythonIntro2022/13_Async (последним исправлял пользователь FrBrGeorge 2022-12-09 17:38:29)