Потоки в Python: threading и concurrent.futures
Кратко: потоки (threading) ускоряют задачи, которые проводят много времени в ожидании ввода/вывода (I/O). Для простых сценариев достаточно threading, для удобства управления множеством задач — concurrent.futures.ThreadPoolExecutor. Для CPU-ограниченных задач лучше multiprocessing или asyncio в сочетании с I/O-оптимизацией. В статье — рабочие примеры, шаблоны для скачивания файлов, рекомендации по безопасности, тестам и отладке.

Время выполнения — один из простых индикаторов эффективности программы: чем быстрее, тем лучше. Потоки позволяют программе выполнять несколько задач одновременно и уменьшать общий простой во время ожидания операций ввода/вывода.
В этой статье вы узнаете, как использовать встроенный модуль threading и высокоуровневый concurrent.futures в Python, увидите примеры для реального сценария (скачивание изображений), разбор ошибок, подходы и чеклисты.
Почему потоки помогают
Потоки сокращают общее время выполнения, если работа состоит из независимых задач, которые большую часть времени проводят в ожидании (сетевые запросы, чтение/запись на диск и т. п.). Вместо того чтобы ждать завершения одной задачи, программа может запустить несколько задач параллельно и обрабатывать результаты по мере их готовности.
Пример: скачивание множества изображений из интернета. Если скачивать по одному файлу последовательно, общее время — сумма задержек. Если запускать несколько загрузок параллельно, общая задержка сокращается почти до максимальной задержки одной из задач (плюс накладные расходы).
Исходная последовательная программа (без потоков)
Ниже функция pause моделирует задачу: пауза на 1 секунду. Вызов функции дважды последовательно даёт суммарное время ~2 секунды.
import time
start_time = time.perf_counter()
def pause():
print('Sleeping 1 second...')
time.sleep(1)
print('Done Sleeping...')
pause()
pause()
finish_time = time.perf_counter()
print(f'Finished in {round(finish_time - start_time, 2)} second(s)')Ожидаемый вывод: программа выполнится примерно за 2.0 секунды: каждая пауза занимает 1 секунду.
Простейшая реализация с threading
Модуль threading подходит для «легких» случаев, когда нужно вручную создать несколько потоков. Нужно создать объекты Thread, запустить их методом start и дождаться завершения методом join.
import time
import threading
start_time = time.perf_counter()
def pause():
print('Sleeping 1 second...')
time.sleep(1)
print('Done Sleeping...')
thread_1 = threading.Thread(target=pause)
thread_2 = threading.Thread(target=pause)
thread_1.start()
thread_2.start()
thread_1.join()
thread_2.join()
finish_time = time.perf_counter()
print(f'Finished in {round(finish_time - start_time, 2)} second(s)')Этот код выполнит две паузы параллельно, и общее время станет около 1 секунды (плюс небольшие накладные расходы).
concurrent.futures: удобный высокоуровневый API
С Python 3.2 появился concurrent.futures — удобный API для запуска задач в пуле потоков (или процессов). ThreadPoolExecutor автоматически управляет созданием, запуском и ожиданием потоков.
Пример с futures:
import time
import concurrent.futures
start_time = time.perf_counter()
def pause():
print('Sleeping 1 second...')
time.sleep(1)
return 'Done Sleeping...'
with concurrent.futures.ThreadPoolExecutor() as executor:
results = [executor.submit(pause) for _ in range(2)]
for f in concurrent.futures.as_completed(results):
print(f.result())
finish_time = time.perf_counter()
print(f'Finished in {round(finish_time - start_time, 2)} second(s)')Этот подход делает код чище: не нужно вручную вызывать join, можно использовать map или submit + as_completed для гибкого управления результатами.
Реальный сценарий: скачивание множества изображений
Последовательный вариант: создаём список URL и скачиваем каждое изображение в цикле. В примере использовалась библиотека requests.
Установка зависимостей:
pip install requestsПример последовательной загрузки (оригинал):
import requests
import time
img_urls = [
'https://images.unsplash.com/photo-1524429656589-6633a470097c',
'https://images.unsplash.com/photo-1530224264768-7ff8c1789d79',
'https://images.unsplash.com/photo-1564135624576-c5c88640f235',
'https://images.unsplash.com/photo-1541698444083-023c97d3f4b6',
'https://images.unsplash.com/photo-1522364723953-452d3431c267',
'https://images.unsplash.com/photo-1513938709626-033611b8cc03',
'https://images.unsplash.com/photo-1507143550189-fed454f93097',
'https://images.unsplash.com/photo-1493976040374-85c8e12f0c0e',
'https://images.unsplash.com/photo-1504198453319-5ce911bafcde',
'https://images.unsplash.com/photo-1530122037265-a5f1f91d3b99',
'https://images.unsplash.com/photo-1516972810927-80185027ca84',
'https://images.unsplash.com/photo-1550439062-609e1531270e',
]
start_time = time.perf_counter()
for img_url in img_urls:
img_bytes = requests.get(img_url).content
img_name = img_url.split('/')[3]
img_name = f'{img_name}.jpg'
with open(img_name, 'wb') as img_file:
img_file.write(img_bytes)
print(f'{img_name} was downloaded...')
finish_time = time.perf_counter()
print(f'Finished in {finish_time - start_time} seconds')В исходном примере на вашей машине это могло занять около 22 секунд (зависит от скорости сети).
Улучшение с ThreadPoolExecutor
Вариант с concurrent.futures значительно сокращает время за счёт параллельных сетевых запросов:
import requests
import time
import concurrent.futures
img_urls = [
'https://images.unsplash.com/photo-1524429656589-6633a470097c',
'https://images.unsplash.com/photo-1530224264768-7ff8c1789d79',
'https://images.unsplash.com/photo-1564135624576-c5c88640f235',
'https://images.unsplash.com/photo-1541698444083-023c97d3f4b6',
'https://images.unsplash.com/photo-1522364723953-452d3431c267',
'https://images.unsplash.com/photo-1513938709626-033611b8cc03',
'https://images.unsplash.com/photo-1507143550189-fed454f93097',
'https://images.unsplash.com/photo-1493976040374-85c8e12f0c0e',
'https://images.unsplash.com/photo-1504198453319-5ce911bafcde',
'https://images.unsplash.com/photo-1530122037265-a5f1f91d3b99',
'https://images.unsplash.com/photo-1516972810927-80185027ca84',
'https://images.unsplash.com/photo-1550439062-609e1531270e',
]
start_time = time.perf_counter()
def download_image(img_url):
img_bytes = requests.get(img_url).content
img_name = img_url.split('/')[3]
img_name = f'{img_name}.jpg'
with open(img_name, 'wb') as img_file:
img_file.write(img_bytes)
print(f'{img_name} was downloaded...')
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(download_image, img_urls)
finish_time = time.perf_counter()
print(f'Finished in {finish_time-start_time} seconds')В примере общее время уменьшилось примерно до 4 секунд — результат параллельных сетевых запросов.
Когда потоки не помогут
- CPU-ограниченные задачи: если ваша функция активно использует процессор (математика, шифрование, компрессия), глобальная блокировка интерпретатора (GIL) в CPython препятствует реальному параллелизму потоков — для таких задач лучше multiprocessing или C-расширения.
- Сложная синхронизация: при большом количестве совместно используемых ресурсов от блокировок и гонок выигрывает дизайн с очередями или процессами.
Важно: GIL — это механизм CPython, ограничивающий одновременное выполнение байт-кода Python несколькими потоками на одном процессе. Он не мешает преимуществам потоков для I/O.
Альтернативы и когда их использовать
- multiprocessing.Process / concurrent.futures.ProcessPoolExecutor — реальный параллелизм для CPU-ограниченных задач.
- asyncio — эффективен для большого количества лёгких I/O-операций (сокеты, HTTP), когда библиотека поддерживает async/await.
- gevent/uvloop — сторонние реализации, дающие конкурентность и низкую латентность в специфичных сценариях.
Выбор: если задача — массовые сетевые запросы и библиотека синхронная (requests), используйте ThreadPoolExecutor + requests.Session; если библиотека асинхронная (aiohttp), используйте asyncio.
Практические советы и улучшения кода
- Используйте requests.Session для переиспользования TCP-соединений (keep-alive) и повышения производительности:
import requests
session = requests.Session()
resp = session.get(url)- Установите разумный лимит max_workers в ThreadPoolExecutor: по умолчанию оно равно числу процессов, но для I/O можно указать больше (например, 10–32), ориентируясь на пропускную способность сети и нагрузку системы.
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
executor.map(download_image, img_urls)- Обработка ошибок: оборачивайте сетевые вызовы в try/except, используйте таймауты и повторные попытки (retry) с экспоненциальной задержкой.
import time
import requests
def safe_get(session, url, retries=3):
for attempt in range(retries):
try:
return session.get(url, timeout=10)
except requests.RequestException:
if attempt == retries - 1:
raise
time.sleep(2 ** attempt)Не записывайте файлы с неподготовленными именами: проверяйте пути, устраняйте коллизии, используйте os.path.join и безопасные имена.
Для большого числа URL используйте потокобезопасную очередь (queue.Queue) и пул потоков потребителей.
Паттерн: шаблон скачивания с сессией и обработкой ошибок
import concurrent.futures
import requests
import time
import os
IMG_DIR = 'images'
os.makedirs(IMG_DIR, exist_ok=True)
def download_image(session, img_url):
try:
resp = session.get(img_url, timeout=10)
resp.raise_for_status()
img_name = img_url.split('/')[3] + '.jpg'
path = os.path.join(IMG_DIR, img_name)
with open(path, 'wb') as f:
f.write(resp.content)
return f'{img_name} downloaded'
except Exception as e:
return f'Error downloading {img_url}: {e}'
start = time.perf_counter()
with requests.Session() as session:
with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
futures = [executor.submit(download_image, session, url) for url in img_urls]
for f in concurrent.futures.as_completed(futures):
print(f.result())
end = time.perf_counter()
print(f'Total: {end - start} seconds')Ментальные модели и эвристики
- Если большая часть времени — ожидание сети/диска → попробуйте потоки.
- Если производительность CPU критична → multiprocessing.
- Если библиотека поддерживает async/await → asyncio часто эффективнее по памяти и масштабу.
- Начинайте с простого — потом замеряйте и профилируйте (time.perf_counter, cProfile).
Проверки и критерии приёмки
- Скрипт скачивает все URL и сохраняет файлы в указанную папку.
- Программа корректно обрабатывает несуществующие URL и сетевые ошибки (не падает полностью).
- Не наблюдается утечек файловых дескрипторов (все файлы закрыты).
- Время выполнения параллельного варианта существенно меньше последовательного для I/O-bound задач на вашей сети.
Чеклисты по ролям
Разработчик:
- Использует requests.Session.
- Добавляет таймауты и обработку исключений.
- Ограничивает количество потоков.
Операции/инфраструктура:
- Контролирует использование сети и диск I/O при массовых загрузках.
- Настраивает лимиты одновременных соединений (firewall, прокси).
QA/тестирование:
- Покрывает сценарии ошибки сети.
- Замеряет производительность на контрольном наборе URL.
Безопасность и конфиденциальность
- Не записывайте содержимое в общедоступные директории без проверки.
- Проверяйте и фильтруйте имена файлов, чтобы избежать уязвимостей типа directory traversal.
- При скачивании из ненадёжных источников проверяйте содержимое (тип MIME) и не исполняйте файлы.
- Если скачиваются персональные данные, учитывайте требования защиты данных и локальные законы (GDPR и т. п.).
Тесты и приёмка
Тесты:
- Мокируйте сетевые вызовы (например, с pytest + responses) и проверяйте, что функции корректно обрабатывают успешный и ошибочный ответ.
- Интеграционные тесты на контролируемом сервере: проверяйте, что загружено N файлов и суммарный размер совпадает с ожидаемым.
Критерии приёмки:
- Все критические URL загружены без необработанных исключений.
- Время выполнения уменьшено для I/O-bound на тестовом окружении.
Сравнение подходов (кратко)
- Последовательно: простота, но медленно для I/O.
- threading: ручное управление, подходит для небольшого числа потоков.
- concurrent.futures.ThreadPoolExecutor: удобен для большинства случаев, автоматическое управление пулом.
- multiprocessing / ProcessPoolExecutor: реальный параллелизм для CPU.
- asyncio + aiohttp: наилучше для очень большого числа одновременных лёгких I/O.
Решающее дерево (Mermaid)
flowchart TD
A[Начало: задача I/O или CPU?]
A -->|I/O-bound| B[Библиотека синхронная?]
B -->|Да| C[Использовать ThreadPoolExecutor или threading]
B -->|Нет, async| D[Использовать asyncio + aiohttp]
A -->|CPU-bound| E[Использовать multiprocessing / ProcessPoolExecutor]
C --> F[Тестировать, мониторить накладные расходы]
D --> F
E --> FЧасто задаваемые вопросы
Влияет ли GIL на сетевые операции?
GIL ограничивает параллельное выполнение байт-кода, но при I/O-вызовах потоки освобождают GIL во время ожидания, поэтому сетевые операции выигрывают от многопоточности.
Сколько потоков использовать?
Зависит от природы задач и ресурсов: для сетевых запросов часто принимают 10–50 потоков, но стоит подобрать значение эмпирически и следить за нагрузкой на сеть и систему.
Нужно ли использовать asyncio вместо потоков?
Если вы контролируете стек (или используете асинхронные библиотеки), asyncio масштабируется лучше по памяти при очень большом числе соединений. Для быстрого улучшения существующего синхронного кода потоки часто проще.
Резюме
- Потоки эффективно ускоряют I/O-bound задачи.
- Для простых сценариев подходит модуль threading; для управления многими задачами — concurrent.futures.ThreadPoolExecutor.
- Для CPU-ограниченных задач используйте multiprocessing, а для очень большого числа подключений — asyncio.
- Всегда обрабатывайте ошибки, используйте сессии для повторного использования соединений и тестируйте нагрузку.
Важно: начинайте с простых изменений, профилируйте и улучшайте архитектуру по мере необходимости — это поможет избежать преждевременной оптимизации и сложной синхронизации.
Похожие материалы
Как скрыть листы и рабочие книги в Excel
Объединить все почтовые адреса в один Gmail
Липкая навигация для сайта — инструкция
Как записывать несколько инструментов в GarageBand
Как установить шрифты Microsoft в Ubuntu