Продвинутый asyncio в Python
В прошлой статье разобрали базовый asyncio: async def, await, gather, Tasks. Здесь — три инструмента, которые приходится использовать в реальных приложениях: очереди между корутинами, синхронизация и (главное) запуск блокирующего кода без остановки event loop.
asyncio.Queue: обмен данными между корутинами
В asyncio все корутины работают в одном потоке и в принципе могут делиться состоянием напрямую. Но для производитель-потребитель паттерна удобнее очередь:
Python 3.13import asyncio async def producer(q): for i in range(5): await q.put(f"item-{i}") await asyncio.sleep(0.1) await q.put(None) # сигнал остановки async def consumer(q): while True: item = await q.get() if item is None: break print(f"Получил {item}") async def main(): q = asyncio.Queue() await asyncio.gather(producer(q), consumer(q)) asyncio.run(main())
API такой же, как у queue.Queue, но методы здесь корутины (await q.put, await q.get). Очередь блокирует на пустом get() или переполненном put() (если задан maxsize), но не сам поток — она уступает управление event loop.
asyncio.Lock: защита общего состояния
В asyncio переключение корутин происходит только на await. Если между двумя await есть критическая секция (где меняется общее состояние), переключение туда не вклинится. Но если внутри критической секции есть await, другая корутина может вмешаться.
Python 3.13import asyncio counter = 0 lock = asyncio.Lock() async def increment(): global counter async with lock: current = counter await asyncio.sleep(0.01) # await ВНУТРИ критической секции counter = current + 1 async def main(): await asyncio.gather(*(increment() for _ in range(100))) print(counter) # 100 — корректно благодаря lock asyncio.run(main())
Без lock несколько корутин прочитали бы одно и то же значение current, и итог был бы меньше 100. С async with lock: только одна корутина может находиться в критической секции одновременно.
В реальном asyncio-коде блокировки нужны редко, потому что большинство переменных живут внутри одной корутины. Lock пригодится, когда несколько корутин читают/пишут одну общую структуру или ресурс — например, общий счётчик активных подключений или кэш.
Кроме Lock есть asyncio.Event, asyncio.Semaphore, asyncio.Condition (API копирует threading, но операции через await).
Блокирующий код в asyncio: run_in_executor
Главное правило asyncio: никогда не вызывайте блокирующие функции напрямую. time.sleep(2), requests.get(), тяжёлые вычисления — всё это остановит event loop, и все остальные корутины замрут.
Но иногда деваться некуда: нужна старая синхронная библиотека или CPU-bound расчёт. На этот случай есть loop.run_in_executor(): запустить блокирующую функцию в отдельном потоке (или процессе), пока event loop спокойно продолжает работу.

Python 3.13import asyncio import time def blocking_io(): print("Блокирующая функция: засыпаю на 2с") time.sleep(2) # синхронный sleep return "готово" async def main(): loop = asyncio.get_running_loop() print("Запускаем блокирующую задачу в executor") # None = executor по умолчанию (ThreadPoolExecutor) future = loop.run_in_executor(None, blocking_io) # пока блокирующая задача работает, event loop свободен await asyncio.sleep(1) print("Event loop работает параллельно") result = await future print(f"Результат: {result}") asyncio.run(main())
run_in_executor(None, func, *args) отдаёт func(*args) в стандартный ThreadPoolExecutor (тот самый, что мы видели в статье про потоки и процессы) и возвращает future, который можно await-ить.
Для CPU-bound кода можно передать ProcessPoolExecutor первым аргументом — функция уйдёт в отдельный процесс с собственным GIL.
async-итерация и контекстные менеджеры
Если объект собирает данные постепенно (через сеть, например), он может быть асинхронным итератором: итерируется через async for:
Python 3.13async for line in aiohttp_response: process(line)
Если ресурс надо открыть и закрыть асинхронно (соединение с БД), это асинхронный контекстный менеджер через async with:
Python 3.13async with aiohttp.ClientSession() as session: async with session.get(url) as response: data = await response.json()
Сами вы их пишете редко, это инструменты библиотек (aiohttp, asyncpg, aioredis). Достаточно знать, что они существуют и узнавать async for / async with в чужом коде.
Сравнение трёх подходов
Правило выбора:
- Тысячи сетевых соединений, новые проекты → asyncio
- I/O в существующем синхронном коде без async-библиотек → threading или ThreadPoolExecutor
- Тяжёлые вычисления → multiprocessing или ProcessPoolExecutor
- В одном приложении часто всё это сочетается: asyncio как основной слой + run_in_executor с пулом потоков/процессов для блокирующих кусков.
Несколько подводных камней
- CPU-bound в asyncio убивает event loop. Используйте run_in_executor с ProcessPoolExecutor для тяжёлых вычислений в async-коде.
- Забытая await: asyncio.sleep(1) без await ничего не делает (создаёт корутину и выбрасывает её). В современных IDE это подсвечивается.
- Mix sync/async: вызов requests.get() (синхронный) в asyncio блокирует всё. Используйте aiohttp / httpx для async-HTTP.
- if __name__ == "__main__": на Windows и macOS обязательна для multiprocessing, иначе процессы будут рекурсивно создавать сами себя.
Проверка понимания
Какой инструмент asyncio используется для безопасного запуска блокирующего кода, не останавливая event loop?
На этом модуль конкурентности завершён. Главное правило: подберите инструмент под тип задачи.
- Тысячи сетевых соединений → asyncio
- I/O в legacy-коде → threading
- Математика и обработка данных → multiprocessing
В современных приложениях чаще всего основная программа на asyncio, а CPU-тяжёлые куски выносятся в process pool через run_in_executor.
