Для упрощения работы с несколькими параллельными процессами можно использовать класс Pool из библиотеки multiprocessing.
Основными методами экземпляра класса Pool являются:
Рассмотрим несколько примеров.
Основными методами экземпляра класса Pool являются:
- apply – эквивалентен обычному вызову функции. Блокируется до момента получения результата и выполняется только в одном рабочем процессе пула. Синтаксис: apply(func, *args)
- map – синтаксис: map(func, iter, [chunksize]) разделяет все итеративные данные iter на указанное число фрагментов chunksize, которые поставляются пулу процессов в виде отдельных задач. Блокирует пул до тех пор, пока не получен конечный результат.
- imap – lazy версия map. Позволяет получать результаты по мере их завершения.
- apply_async – неблокирующий вызов apply. Синтаксис: apply_async(func, *args, [callback]). Результаты выполнения могут быть переданы в callback функцию для их дальнейшей обработки, либо можно получить результат с помощью метда get().
Рассмотрим несколько примеров.
import os
from multiprocessing import Pool, current_process
import time
import sys
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stream=sys.stdout)
handler.setFormatter(logging.Formatter(fmt='%(asctime)s: %(message)s'))
logger.addHandler(handler)
def calc(value: int) -> int:
logger.info(f'calc started in process {current_process().name} with pid {current_process().pid}')
time.sleep(1)
return value**10
if __name__ == '__main__':
logger.info(f'main pid: {os.getpid()}')
with Pool(processes=2) as pool:
result1 = pool.apply(calc, (10,))
logger.info('result1: %s', result1)
result2 = pool.apply(calc, (10,))
logger.info('result2: %s', result2)
result3 = pool.apply(calc, (10,))
logger.info('result3: %s', result3)
result4 = pool.apply(calc, (10,))
logger.info('result4: %s', result4)
print('End program')
from multiprocessing import Pool, current_process
import time
import sys
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stream=sys.stdout)
handler.setFormatter(logging.Formatter(fmt='%(asctime)s: %(message)s'))
logger.addHandler(handler)
def calc(value: int) -> int:
logger.info(f'calc started in process {current_process().name} with pid {current_process().pid}')
time.sleep(1)
return value**10
if __name__ == '__main__':
logger.info(f'main pid: {os.getpid()}')
with Pool(processes=2) as pool:
result1 = pool.apply(calc, (10,))
logger.info('result1: %s', result1)
result2 = pool.apply(calc, (10,))
logger.info('result2: %s', result2)
result3 = pool.apply(calc, (10,))
logger.info('result3: %s', result3)
result4 = pool.apply(calc, (10,))
logger.info('result4: %s', result4)
print('End program')
Пример вывода:
2023-01-25 18:21:40,027: main pid: 34098
2023-01-25 18:21:40,041: calc started in process ForkPoolWorker-1 with pid 34099
2023-01-25 18:21:41,044: result1: 10000000000
2023-01-25 18:21:41,047: calc started in process ForkPoolWorker-2 with pid 34100
2023-01-25 18:21:42,050: result2: 10000000000
2023-01-25 18:21:42,050: calc started in process ForkPoolWorker-1 with pid 34099
2023-01-25 18:21:43,052: result3: 10000000000
2023-01-25 18:21:43,053: calc started in process ForkPoolWorker-2 with pid 34100
2023-01-25 18:21:44,056: result4: 10000000000
End program
2023-01-25 18:21:40,041: calc started in process ForkPoolWorker-1 with pid 34099
2023-01-25 18:21:41,044: result1: 10000000000
2023-01-25 18:21:41,047: calc started in process ForkPoolWorker-2 with pid 34100
2023-01-25 18:21:42,050: result2: 10000000000
2023-01-25 18:21:42,050: calc started in process ForkPoolWorker-1 with pid 34099
2023-01-25 18:21:43,052: result3: 10000000000
2023-01-25 18:21:43,053: calc started in process ForkPoolWorker-2 with pid 34100
2023-01-25 18:21:44,056: result4: 10000000000
End program
Рассмотрим пример с apply. В начале скрипта происходит добавление необходимых импортов и базовая настройка логгера, для отображения времени. Определяем базовую функцию calc, которая будет возводить в 10 степень полученное число.
Затем в основном блоке с помощью контекстного менеджера with определяем пул процессов с максимальным числом равным двум. И начинаем вызывать функцию calc в отдельном процессе.
В консоли можно увидеть, что учавствовают два дочерних процесса с pid 34100 и 34099, но при этом параллельно они не работают, задержка между каждым выводом результат составляет одну секунду. Никакого выигрыша в производительность в данном случае нет.
Рассмотрим пример с функцией map:
Затем в основном блоке с помощью контекстного менеджера with определяем пул процессов с максимальным числом равным двум. И начинаем вызывать функцию calc в отдельном процессе.
В консоли можно увидеть, что учавствовают два дочерних процесса с pid 34100 и 34099, но при этом параллельно они не работают, задержка между каждым выводом результат составляет одну секунду. Никакого выигрыша в производительность в данном случае нет.
Рассмотрим пример с функцией map:
import os
from multiprocessing import Pool, current_process
import time
import sys
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stream=sys.stdout)
handler.setFormatter(logging.Formatter(fmt='%(asctime)s: %(message)s'))
logger.addHandler(handler)
def calc(value: int) -> int:
logger.info(f'calc started in process {current_process().name} with pid {current_process().pid}')
time.sleep(1)
return value**10
if __name__ == '__main__':
logger.info(f'main pid: {os.getpid()}')
l = [1, 2, 5, 10]
with Pool(processes=2) as pool:
results = pool.map(calc, l)
print(results)
print('End program')
from multiprocessing import Pool, current_process
import time
import sys
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stream=sys.stdout)
handler.setFormatter(logging.Formatter(fmt='%(asctime)s: %(message)s'))
logger.addHandler(handler)
def calc(value: int) -> int:
logger.info(f'calc started in process {current_process().name} with pid {current_process().pid}')
time.sleep(1)
return value**10
if __name__ == '__main__':
logger.info(f'main pid: {os.getpid()}')
l = [1, 2, 5, 10]
with Pool(processes=2) as pool:
results = pool.map(calc, l)
print(results)
print('End program')
Вывод в консоль:
2023-01-25 18:28:27,135: main pid: 34311
2023-01-25 18:28:27,148: calc started in process ForkPoolWorker-2 with pid 34313
2023-01-25 18:28:27,148: calc started in process ForkPoolWorker-1 with pid 34312
2023-01-25 18:28:28,150: calc started in process ForkPoolWorker-1 with pid 34312
2023-01-25 18:28:28,150: calc started in process ForkPoolWorker-2 with pid 34313
[1, 1024, 9765625, 10000000000]
End program
2023-01-25 18:28:27,148: calc started in process ForkPoolWorker-2 with pid 34313
2023-01-25 18:28:27,148: calc started in process ForkPoolWorker-1 with pid 34312
2023-01-25 18:28:28,150: calc started in process ForkPoolWorker-1 with pid 34312
2023-01-25 18:28:28,150: calc started in process ForkPoolWorker-2 with pid 34313
[1, 1024, 9765625, 10000000000]
End program
В основном коде программы заранее готовим список l с числами, для которых будем производить расчет и запускаем вызов функции calc с помощью метода map.
В консоли видим, что два дочерних процесса стартанули одновременно, затем еще два. Результат представлен в виде списка выходных данных функции calc. Также следует обратить внимание на то, что список упорядочен в соответвии со входными данными в списке l.
Попробуем вариант с использованием apply_async:
В консоли видим, что два дочерних процесса стартанули одновременно, затем еще два. Результат представлен в виде списка выходных данных функции calc. Также следует обратить внимание на то, что список упорядочен в соответвии со входными данными в списке l.
Попробуем вариант с использованием apply_async:
import os
from multiprocessing import Pool, current_process
import time
import sys
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stream=sys.stdout)
handler.setFormatter(logging.Formatter(fmt='%(asctime)s: %(message)s'))
logger.addHandler(handler)
def calc(value: int) -> int:
logger.info(f'calc started in process {current_process().name} with pid {current_process().pid}')
time.sleep(1)
return value**10
def callback(result):
logger.info(f'Callback result: {result}, process pid: {current_process().pid}')
if __name__ == '__main__':
logger.info(f'main pid: {os.getpid()}')
with Pool(processes=2) as pool:
result1 = pool.apply_async(calc, (1,), callback=callback)
logger.info('result1: %s', result1)
result2 = pool.apply_async(calc, (2,), callback=callback)
logger.info('result2: %s', result2)
result3 = pool.apply_async(calc, (3,), callback=callback)
logger.info('result3: %s', result3)
result4 = pool.apply_async(calc, (4,), callback=callback)
logger.info('result4: %s', result4)
pool.close()
pool.join()
print('End program')
from multiprocessing import Pool, current_process
import time
import sys
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stream=sys.stdout)
handler.setFormatter(logging.Formatter(fmt='%(asctime)s: %(message)s'))
logger.addHandler(handler)
def calc(value: int) -> int:
logger.info(f'calc started in process {current_process().name} with pid {current_process().pid}')
time.sleep(1)
return value**10
def callback(result):
logger.info(f'Callback result: {result}, process pid: {current_process().pid}')
if __name__ == '__main__':
logger.info(f'main pid: {os.getpid()}')
with Pool(processes=2) as pool:
result1 = pool.apply_async(calc, (1,), callback=callback)
logger.info('result1: %s', result1)
result2 = pool.apply_async(calc, (2,), callback=callback)
logger.info('result2: %s', result2)
result3 = pool.apply_async(calc, (3,), callback=callback)
logger.info('result3: %s', result3)
result4 = pool.apply_async(calc, (4,), callback=callback)
logger.info('result4: %s', result4)
pool.close()
pool.join()
print('End program')
По сравнению с предыдущими примерами, добавлена функция callback, которая будет вызываться при завершении процесса. В основном коде происходит последовательная передача чисел 1,2,3,4 в функцию calc, передается также callback для каждого вызова. В конце контекстного менеджера добавлены две инструкции close() и join(). сlose() предотвращает отправку новых задач в пул. Как только все задачи будут выполнены, рабочие процессы завершатся, а join() ожидает завершения рабочих процессов. Без указания этих команд, основная программа просто завершит свою работу, не дождавшись выполнения процессов и вызова callback функции.
Вывод в консоль:
Вывод в консоль:
2023-01-25 18:33:03,614: main pid: 34515
2023-01-25 18:33:03,625: result1: <multiprocessing.pool.ApplyResult object at 0x7fa55b754100>
2023-01-25 18:33:03,626: result2: <multiprocessing.pool.ApplyResult object at 0x7fa55b754130>
2023-01-25 18:33:03,626: result3: <multiprocessing.pool.ApplyResult object at 0x7fa55b754340>
2023-01-25 18:33:03,626: result4: <multiprocessing.pool.ApplyResult object at 0x7fa55b754460>
2023-01-25 18:33:03,626: calc started in process ForkPoolWorker-1 with pid 34516
2023-01-25 18:33:03,626: calc started in process ForkPoolWorker-2 with pid 34517
2023-01-25 18:33:04,628: calc started in process ForkPoolWorker-2 with pid 34517
2023-01-25 18:33:04,628: calc started in process ForkPoolWorker-1 with pid 34516
2023-01-25 18:33:04,628: Callback result: 1024, process pid: 34515
2023-01-25 18:33:04,628: Callback result: 1, process pid: 34515
2023-01-25 18:33:05,630: Callback result: 1048576, process pid: 34515
2023-01-25 18:33:05,630: Callback result: 59049, process pid: 34515
End program
2023-01-25 18:33:03,625: result1: <multiprocessing.pool.ApplyResult object at 0x7fa55b754100>
2023-01-25 18:33:03,626: result2: <multiprocessing.pool.ApplyResult object at 0x7fa55b754130>
2023-01-25 18:33:03,626: result3: <multiprocessing.pool.ApplyResult object at 0x7fa55b754340>
2023-01-25 18:33:03,626: result4: <multiprocessing.pool.ApplyResult object at 0x7fa55b754460>
2023-01-25 18:33:03,626: calc started in process ForkPoolWorker-1 with pid 34516
2023-01-25 18:33:03,626: calc started in process ForkPoolWorker-2 with pid 34517
2023-01-25 18:33:04,628: calc started in process ForkPoolWorker-2 with pid 34517
2023-01-25 18:33:04,628: calc started in process ForkPoolWorker-1 with pid 34516
2023-01-25 18:33:04,628: Callback result: 1024, process pid: 34515
2023-01-25 18:33:04,628: Callback result: 1, process pid: 34515
2023-01-25 18:33:05,630: Callback result: 1048576, process pid: 34515
2023-01-25 18:33:05,630: Callback result: 59049, process pid: 34515
End program
Здесь мы видим, что все задачи для процессов были созданы одновременно. Функция apply_async вернула объект класса multiprocessing.pool.ApplyResult. Затем стартанули первые два процесса с pid 34516 и 34517, а затем вторые два.
После этого происходил вызов callback функции в основном процессе с pid 34515, но результаты перемешались. Сначала вывод для инструкции, которая была второй, затем вывод результата для первой инструкции и т.д. apply_async не гарантирует корректной последовательно получения результата, в отличае от map функции.
После этого происходил вызов callback функции в основном процессе с pid 34515, но результаты перемешались. Сначала вывод для инструкции, которая была второй, затем вывод результата для первой инструкции и т.д. apply_async не гарантирует корректной последовательно получения результата, в отличае от map функции.