Продвинутый asyncio в Python

В прошлой статье разобрали базовый asyncio: async def, await, gather, Tasks. Здесь — три инструмента, которые приходится использовать в реальных приложениях: очереди между корутинами, синхронизация и (главное) запуск блокирующего кода без остановки event loop.

asyncio.Queue: обмен данными между корутинами

В asyncio все корутины работают в одном потоке и в принципе могут делиться состоянием напрямую. Но для производитель-потребитель паттерна удобнее очередь:

Python 3.13
import asyncio

async def producer(q):
    for i in range(5):
        await q.put(f"item-{i}")
        await asyncio.sleep(0.1)
    await q.put(None)            # сигнал остановки

async def consumer(q):
    while True:
        item = await q.get()
        if item is None:
            break
        print(f"Получил {item}")

async def main():
    q = asyncio.Queue()
    await asyncio.gather(producer(q), consumer(q))

asyncio.run(main())

API такой же, как у queue.Queue, но методы здесь корутины (await q.put, await q.get). Очередь блокирует на пустом get() или переполненном put() (если задан maxsize), но не сам поток — она уступает управление event loop.

asyncio.Lock: защита общего состояния

В asyncio переключение корутин происходит только на await. Если между двумя await есть критическая секция (где меняется общее состояние), переключение туда не вклинится. Но если внутри критической секции есть await, другая корутина может вмешаться.

Python 3.13
import asyncio

counter = 0
lock = asyncio.Lock()

async def increment():
    global counter
    async with lock:
        current = counter
        await asyncio.sleep(0.01)     # await ВНУТРИ критической секции
        counter = current + 1

async def main():
    await asyncio.gather(*(increment() for _ in range(100)))
    print(counter)        # 100 — корректно благодаря lock

asyncio.run(main())

Без lock несколько корутин прочитали бы одно и то же значение current, и итог был бы меньше 100. С async with lock: только одна корутина может находиться в критической секции одновременно.

В реальном asyncio-коде блокировки нужны редко, потому что большинство переменных живут внутри одной корутины. Lock пригодится, когда несколько корутин читают/пишут одну общую структуру или ресурс — например, общий счётчик активных подключений или кэш.

Кроме Lock есть asyncio.Event, asyncio.Semaphore, asyncio.Condition (API копирует threading, но операции через await).

Блокирующий код в asyncio: run_in_executor

Главное правило asyncio: никогда не вызывайте блокирующие функции напрямую. time.sleep(2), requests.get(), тяжёлые вычисления — всё это остановит event loop, и все остальные корутины замрут.

Но иногда деваться некуда: нужна старая синхронная библиотека или CPU-bound расчёт. На этот случай есть loop.run_in_executor(): запустить блокирующую функцию в отдельном потоке (или процессе), пока event loop спокойно продолжает работу.

Иллюстрация: event loop с кодом await blocking_task() слева; стрелка run_in_executor к Thread Pool Executor справа, где выполняется time.sleep(2); внизу другие корутины продолжают работать; стрелка возврата результата

Python 3.13
import asyncio
import time

def blocking_io():
    print("Блокирующая функция: засыпаю на 2с")
    time.sleep(2)                    # синхронный sleep
    return "готово"

async def main():
    loop = asyncio.get_running_loop()
    print("Запускаем блокирующую задачу в executor")

    # None = executor по умолчанию (ThreadPoolExecutor)
    future = loop.run_in_executor(None, blocking_io)

    # пока блокирующая задача работает, event loop свободен
    await asyncio.sleep(1)
    print("Event loop работает параллельно")

    result = await future
    print(f"Результат: {result}")

asyncio.run(main())

run_in_executor(None, func, *args) отдаёт func(*args) в стандартный ThreadPoolExecutor (тот самый, что мы видели в статье про потоки и процессы) и возвращает future, который можно await-ить.

Для CPU-bound кода можно передать ProcessPoolExecutor первым аргументом — функция уйдёт в отдельный процесс с собственным GIL.

async-итерация и контекстные менеджеры

Если объект собирает данные постепенно (через сеть, например), он может быть асинхронным итератором: итерируется через async for:

Python 3.13
async for line in aiohttp_response:
    process(line)

Если ресурс надо открыть и закрыть асинхронно (соединение с БД), это асинхронный контекстный менеджер через async with:

Python 3.13
async with aiohttp.ClientSession() as session:
    async with session.get(url) as response:
        data = await response.json()

Сами вы их пишете редко, это инструменты библиотек (aiohttp, asyncpg, aioredis). Достаточно знать, что они существуют и узнавать async for / async with в чужом коде.

Сравнение трёх подходов

threadingmultiprocessingasyncio
Параллелизм CPUнет (GIL)данет (1 поток)
I/O-boundхорошохорошо, но дорогоотлично
Накладные расходынизкиевысокиеминимальные
Памятьобщаяизолированнаяобщая (1 поток)
Обмен даннымипеременные + Lock / QueueQueue, Pipe, Managerпеременные / asyncio.Queue
Тысячи задачплохоочень плохопрекрасно

Правило выбора:

  • Тысячи сетевых соединений, новые проекты → asyncio
  • I/O в существующем синхронном коде без async-библиотек → threading или ThreadPoolExecutor
  • Тяжёлые вычисления → multiprocessing или ProcessPoolExecutor
  • В одном приложении часто всё это сочетается: asyncio как основной слой + run_in_executor с пулом потоков/процессов для блокирующих кусков.

Несколько подводных камней

  • CPU-bound в asyncio убивает event loop. Используйте run_in_executor с ProcessPoolExecutor для тяжёлых вычислений в async-коде.
  • Забытая await: asyncio.sleep(1) без await ничего не делает (создаёт корутину и выбрасывает её). В современных IDE это подсвечивается.
  • Mix sync/async: вызов requests.get() (синхронный) в asyncio блокирует всё. Используйте aiohttp / httpx для async-HTTP.
  • if __name__ == "__main__": на Windows и macOS обязательна для multiprocessing, иначе процессы будут рекурсивно создавать сами себя.

Проверка понимания

Какой инструмент asyncio используется для безопасного запуска блокирующего кода, не останавливая event loop?


На этом модуль конкурентности завершён. Главное правило: подберите инструмент под тип задачи.

  • Тысячи сетевых соединений → asyncio
  • I/O в legacy-коде → threading
  • Математика и обработка данных → multiprocessing

В современных приложениях чаще всего основная программа на asyncio, а CPU-тяжёлые куски выносятся в process pool через run_in_executor.