ThreadPoolExecutor. Синхронизация потоков в python

Главная > Блог > ThreadPoolExecutor. Синхронизация потоков в python

Класс ThreadPoolExecutor

Часто будет требоваться запуск нескольких потоков одновременно, например для обработки списка из нескольких файлов, или запросов на несколько url. Библиотека concurrent.future предоставляет класс ThreadPoolExecutor для упрощения обработки параллельных потоков.

Обычно экземпляр ThreadPoolExecutor создается с контекстным менеджером with, чтобы определять исполняемые блоки и очищать потоки после выполнения.

Основные методы класса submit и map.

submit позволяет создавать поток для одной функции с переданными в нее параметрами. Синтаксис: submit(func: Callable, [*args, **kwargs]).

Map позволяет запускать несколько потоков для указанного callable объекта, и массива входных параметров для него. Синтаксис: map(func, *iterables, timeout=None, [chunksize])

Рассмотрим на примере:
from concurrent.futures import ThreadPoolExecutor
import time
from threading import current_thread


def sum_len(data: list[int]) -> int:
      return sum(data)


def foo(text: str) -> int:
      time.sleep(len(text))
      print('[foo] Current thread:', current_thread().name)
      print('msg:', text)
      return len(text)


fruit_list = ['apple', 'banana', 'pineapple']

with ThreadPoolExecutor(max_workers=3) as pool:
      print('[main] Current thread:', current_thread().name)
      results = pool.map(foo, fruit_list)
      print('results type:', type(results))

      result = pool.submit(sum_len, results)
      print('result type:', type(result))

      print('Waiting for thread end...')

      print('result:', result.result())
      print('Threads finished')
Вывод в консоль:
[main] Current thread: MainThread
results type: <class 'generator'>
result type: <class 'concurrent.futures._base.Future'>
Waiting for thread end...
[foo] Current thread: ThreadPoolExecutor-0_0
msg: apple
[foo] Current thread: ThreadPoolExecutor-0_1
msg: banana
[foo] Current thread: ThreadPoolExecutor-0_2
msg: pineapple
result: 20
Threads finished
Данный код позволяет получать для списка длину строковых значений и затем находить их сумму. Для начала необходимые импорты и определяем две функции: foo, которая возвращает длину строки и sum_len, которая возвращает сумму длин элементов списка. Определяем список fruit_list для тестов.

Затем, с помощью контекстного менеджера with, содаем экземпляр пула потоков. В качестве параметра указываем максимальное количество воркеров с помощью параметра max_workers.

С помощью инструкции pool.map указываем, что нам нужно выполнить функцию foo для каждого элемента списка fruit_list в отдельном потоке и записать результат в переменную results. Обратите внимание на тип переменной results – это генератор.

Инструкция pool.submit указывает на то, что мы должны применить к результату выполнения предыдущих потоков функцию sum_len и записать полученное значение в переменную result. Типом переменной result будет concurrent.futures._base.Future (или как его еще называют “футура”). Футура – это ожидаемый объект. Чтобы получить его вычисленный результат, можно использовать его метод .result().

Заключение

Класс ThreadPoolExecutor удобно использовать когда требуется много параллельных обработок IO bound операций.