Многопоточность в Python: Модуль threading
В предыдущей статье мы рассмотрели общие концепции конкурентности, параллелизма и асинхронности. Теперь углубимся в один из классических способов достижения конкурентности в Python — многопоточность, используя встроенный модуль threading.
Многопоточность позволяет вашей программе выполнять несколько задач (потоков) как бы одновременно в рамках одного процесса. Это особенно полезно для I/O-bound задач, где программа часто ожидает завершения внешних операций.
Введение в модуль threading
Модуль threading предоставляет высокоуровневый интерфейс для работы с потоками. Каждый поток представляет собой отдельную последовательность инструкций, которая может выполняться независимо от других потоков в том же процессе.
Важно помнить, что все потоки внутри одного процесса разделяют общее адресное пространство, что облегчает обмен данными, но также требует механизмов синхронизации для предотвращения конфликтов.
Создание и запуск потоков
Основной способ создать поток — это создать экземпляр класса threading.Thread, передав ему целевую функцию (которую будет выполнять поток) и ее аргументы.
import threading import time def worker(number, sleep_time): """Функция, которую будет выполнять наш поток.""" print(f"Поток {number}: начинаю работу, буду спать {sleep_time} сек.") time.sleep(sleep_time) print(f"Поток {number}: завершаю работу.") # Создаем потоки thread1 = threading.Thread(target=worker, args=(1, 2)) # args передаются как кортеж thread2 = threading.Thread(target=worker, args=(2, 1)) # Запускаем потоки thread1.start() thread2.start() print("Все потоки запущены из основного потока.") # Ожидаем завершения потоков (опционально, но часто нужно) thread1.join() # Основной поток будет ждать, пока thread1 не завершится thread2.join() # Основной поток будет ждать, пока thread2 не завершится print("Все потоки завершили свою работу.")
- target: функция, которая будет выполняться в новом потоке.
- args: кортеж аргументов для целевой функции.
- start(): запускает выполнение потока.
- join(): блокирует вызывающий поток до тех пор, пока поток, у которого вызван этот метод, не завершится.
Если запустить этот код, вы увидите, что сообщения от потоков будут перемешиваться, демонстрируя их конкурентное выполнение.
Глобальная блокировка интерпретатора (GIL) и ее влияние
Прежде чем углубляться в синхронизацию, важно упомянуть Global Interpreter Lock (GIL). В CPython (стандартной реализации Python) GIL — это мьютекс, который защищает доступ к объектам Python, предотвращая одновременное выполнение байт-кода Python несколькими потоками в одном процессе.
Что это означает на практике:
- Для CPU-bound задач (интенсивные вычисления), многопоточность в Python на одном процессе не даст прироста производительности на многоядерных системах, так как только один поток может держать GIL и выполнять Python-код в любой момент времени.
- Для I/O-bound задач (ожидание сети, диска), GIL освобождается во время блокирующих операций ввода-вывода. Это позволяет другим потокам Python выполняться, пока один поток ожидает. Поэтому threading эффективен для I/O-bound задач.
Из-за GIL, если вам нужен настоящий параллелизм для CPU-bound задач, следует использовать модуль multiprocessing (о котором пойдет речь в следующей статье).
Синхронизация потоков
Когда несколько потоков работают с общими данными, возникают риски состояний гонки (race conditions) и других проблем. Модуль threading предоставляет несколько примитивов синхронизации.
1. Блокировки (Lock и RLock)
Объект Lock (мьютекс) — это самый простой примитив синхронизации. Он имеет два состояния: "захвачен" (locked) и "не захвачен" (unlocked). Поток может вызвать метод acquire() для захвата блокировки. Если блокировка уже захвачена другим потоком, вызывающий поток будет заблокирован до тех пор, пока она не будет освобождена методом release().
import threading import time shared_resource = 0 lock = threading.Lock() def increment_shared_resource(): global shared_resource for _ in range(100000): lock.acquire() # Захватываем блокировку try: shared_resource += 1 finally: lock.release() # Освобождаем блокировку в любом случае threads = [] for i in range(5): thread = threading.Thread(target=increment_shared_resource) threads.append(thread) thread.start() for thread in threads: thread.join() print(f"Итоговое значение shared_resource: {shared_resource}") # Ожидается 500000
Использование try...finally с acquire() и release() гарантирует, что блокировка будет освобождена, даже если в защищенном блоке кода произойдет исключение.
RLock (реентерабельная блокировка) позволяет одному и тому же потоку захватывать блокировку несколько раз. release() должен быть вызван столько же раз, сколько был вызван acquire(). Это полезно в ситуациях с рекурсивным использованием защищенного ресурса тем же потоком.
2. События (Event)
Объект Event — это простой механизм для сигнализации между потоками. Один поток может ожидать установки флага события, в то время как другой поток может этот флаг установить.
- event.wait(): Блокирует поток до тех пор, пока внутренний флаг события не станет истинным.
- event.set(): Устанавливает внутренний флаг в True.
- event.clear(): Сбрасывает внутренний флаг в False.
- event.is_set(): Возвращает текущее состояние флага.
import threading import time event = threading.Event() def waiter(): print("Ожидающий поток: жду события...") event.wait() # Блокируется, пока событие не будет установлено print("Ожидающий поток: событие получено, продолжаю работу!") def setter(): print("Устанавливающий поток: немного подожду...") time.sleep(2) print("Устанавливающий поток: устанавливаю событие!") event.set() t1 = threading.Thread(target=waiter) t2 = threading.Thread(target=setter) t1.start() t2.start() t1.join() t2.join()
3. Семафоры (Semaphore и BoundedSemaphore)
Семафор управляет внутренним счетчиком, который уменьшается при каждом вызове acquire() и увеличивается при release(). Если счетчик равен нулю, acquire() блокируется. Это позволяет ограничить количество потоков, одновременно обращающихся к ресурсу.
BoundedSemaphore дополнительно проверяет, чтобы количество вызовов release() не превышало количество вызовов acquire(), предотвращая ошибки.
import threading import time import random MAX_CONNECTIONS = 3 semaphore = threading.BoundedSemaphore(MAX_CONNECTIONS) def use_resource(thread_id): print(f"Поток {thread_id}: пытаюсь захватить ресурс...") with semaphore: # Контекстный менеджер автоматически вызывает acquire/release print(f"Поток {thread_id}: ресурс захвачен.") sleep_time = random.randint(1, 3) time.sleep(sleep_time) print(f"Поток {thread_id}: освобождаю ресурс после {sleep_time} сек.") threads = [] for i in range(10): thread = threading.Thread(target=use_resource, args=(i,)) threads.append(thread) thread.start() for thread in threads: thread.join()
Вы увидите, что одновременно ресурс используют не более 3 потоков.
4. Условные переменные (Condition)
Объект Condition используется для более сложной синхронизации, когда потоки должны ожидать выполнения определенного условия. Condition обычно ассоциируется с блокировкой (Lock или RLock).
- condition.acquire() / condition.release(): Захват/освобождение связанной блокировки.
- condition.wait(): Освобождает связанную блокировку и ждет, пока другой поток не вызовет notify() или notify_all(). Затем снова пытается захватить блокировку.
- condition.notify(n=1): Пробуждает один (или n) поток, ожидающий на этом условии.
- condition.notify_all(): Пробуждает все потоки, ожидающие на этом условии.
import threading import time condition = threading.Condition() items = [] def producer(): for i in range(5): time.sleep(random.uniform(0.1, 0.5)) with condition: # Захватываем блокировку item = f"Элемент-{i}" items.append(item) print(f"Производитель: добавил {item}, всего {len(items)} элементов. Оповещаю.") condition.notify() # Оповещаем одного потребителя def consumer(name): while True: with condition: print(f"{name}: ожидаю элемент...") condition.wait() # Ожидаем, пока производитель что-то добавит и оповестит if not items: print(f"{name}: список пуст, но меня разбудили. Продолжаю ждать.") continue # Может быть ложное пробуждение или другой потребитель забрал item = items.pop(0) print(f"{name}: забрал {item}, осталось {len(items)} элементов.") if not items and threading.active_count() <= 3: # Пример условия выхода break time.sleep(random.uniform(0.2, 0.6)) # Имитация обработки # Запускаем одного производителя и двух потребителей threading.Thread(target=producer).start() threading.Thread(target=consumer, args=("Потребитель-A",)).start() threading.Thread(target=consumer, args=("Потребитель-B",)).start()
Этот пример демонстрирует классическую задачу "производитель-потребитель".
Очереди для обмена данными между потоками (queue.Queue)
Модуль queue предоставляет потокобезопасные классы очередей (Queue, LifoQueue, PriorityQueue), которые являются предпочтительным способом обмена данными между потоками. Они инкапсулируют всю необходимую логику блокировок.
- q.put(item): Добавляет элемент в очередь. Если очередь полна (для ограниченных очередей), блокируется.
- q.get(): Удаляет и возвращает элемент из очереди. Если очередь пуста, блокируется.
- q.task_done(): Сигнализирует, что извлеченная задача выполнена (используется с q.join()).
- q.join(): Блокируется до тех пор, пока все элементы в очереди не будут извлечены и для каждого не будет вызван task_done().
import threading import queue import time import random q = queue.Queue() def producer_q(): for i in range(10): item = f"Элемент-{i}" time.sleep(random.uniform(0.1, 0.3)) q.put(item) print(f"Производитель: добавил {item} в очередь (размер: {q.qsize()})") q.put(None) # Сигнал окончания для одного потребителя q.put(None) # Сигнал окончания для второго потребителя def consumer_q(name): while True: item = q.get() # Блокируется, если очередь пуста if item is None: # Сигнал окончания q.task_done() # Сообщаем, что задача "None" обработана print(f"{name}: получил сигнал None, завершаю работу.") break print(f"{name}: обработал {item}") time.sleep(random.uniform(0.2, 0.5)) q.task_done() # Сообщаем, что задача обработана # Запускаем производителя и двух потребителей threading.Thread(target=producer_q).start() threading.Thread(target=consumer_q, args=("Потребитель-1",)).start() threading.Thread(target=consumer_q, args=("Потребитель-2",)).start() # q.join() # Можно использовать, если producer не шлет None и мы хотим дождаться обработки всех задач
Практические примеры и когда использовать threading
Модуль threading лучше всего подходит для:
- I/O-bound операций: Загрузка веб-страниц, чтение/запись файлов, взаимодействие с базами данных. Пока один поток ждет ответа от сети, другие могут выполняться.
- Фоновых задач: Выполнение задач, которые не должны блокировать основной поток приложения (например, обновление данных в GUI, периодическая проверка состояния).
- Упрощения структуры кода: Иногда разделение логики на потоки может сделать код более читаемым и управляемым, даже если нет значительного выигрыша в производительности.
Пример: Параллельная загрузка веб-страниц
import threading import requests import time urls = [ "https://api.github.com", "https://httpbin.org/get", "https://jsonplaceholder.typicode.com/todos/1", "https://www.python.org/", "https://www.google.com/" ] def fetch_url(url): try: start_time = time.time() response = requests.get(url, timeout=5) duration = time.time() - start_time print(f"Загружен {url} за {duration:.2f} сек. Статус: {response.status_code}") except requests.exceptions.RequestException as e: print(f"Ошибка при загрузке {url}: {e}") threads = [] start_total_time = time.time() for url in urls: thread = threading.Thread(target=fetch_url, args=(url,)) threads.append(thread) thread.start() for thread in threads: thread.join() end_total_time = time.time() print(f"\nВсего времени на загрузку: {end_total_time - start_total_time:.2f} сек.")
Этот пример покажет, что общее время загрузки будет значительно меньше, чем сумма времени загрузки каждой страницы по отдельности, так как запросы выполняются конкурентно.
Что дальше?
Мы рассмотрели основы многопоточности в Python с использованием модуля threading, включая создание потоков, примитивы синхронизации и ограничения GIL. Для задач, требующих интенсивных вычислений и настоящего параллелизма, Python предлагает модуль multiprocessing, который мы изучим в следующей статье.
Какая основная проблема решается примитивами синхронизации в многопоточном программировании?