Так как у процессов нет общей памяти, очень важно организовывать доставку параметров и результатов выполнения разных типов для корректной работы программы.
Для обеспечения этой потребности существуют различные модели обмена данными. Но мы остановимся на одной из них – очереди.
Очередь – это структура данных с типом FIFO (First in, First out), первым пришел, первым ушел. Очереди позволяют обмениваться сообщениями между группами процессов.
Задача “producer-consumer” описывает два процесса: один является поставщиком данных или задач, т.е. является их производителем, а второй получает их и обрабатывает, т.е. потребляет и удаляет их из очереди. Совместно они используют общий буфер для обмена сообщениями – очередь.
Рассмотрим на примере:
Для обеспечения этой потребности существуют различные модели обмена данными. Но мы остановимся на одной из них – очереди.
Очередь – это структура данных с типом FIFO (First in, First out), первым пришел, первым ушел. Очереди позволяют обмениваться сообщениями между группами процессов.
Задача “producer-consumer” описывает два процесса: один является поставщиком данных или задач, т.е. является их производителем, а второй получает их и обрабатывает, т.е. потребляет и удаляет их из очереди. Совместно они используют общий буфер для обмена сообщениями – очередь.
Рассмотрим на примере:
from multiprocessing import Process, get_context
from multiprocessing.queues import Queue
import time
class Producer(Process):
def __init__(self, queue: Queue):
super().__init__()
self.__queue = queue
def run(self):
for msg in range(5):
self.__queue.put(msg)
time.sleep(0.5)
class Consumer(Process):
def __init__(self, queue: Queue, queue_result: Queue):
super().__init__()
self.__queue = queue
self.__queue_result = queue_result
def run(self):
while True:
if self.__queue.empty():
print('Queue is empty. Exit.')
break
else:
item = self.__queue.get()
res = item ** 2
print(res)
self.__queue_result.put(res)
time.sleep(1)
if __name__ == '__main__':
data = []
context = get_context('spawn')
queue = Queue(ctx=context)
queue_result = Queue(ctx=context)
producer_process = Producer(queue)
consumer_process = Consumer(queue, queue_result)
producer_process.start()
consumer_process.start()
producer_process.join()
consumer_process.join()
while not queue_result.empty():
data.append(queue_result.get())
print(data)
from multiprocessing.queues import Queue
import time
class Producer(Process):
def __init__(self, queue: Queue):
super().__init__()
self.__queue = queue
def run(self):
for msg in range(5):
self.__queue.put(msg)
time.sleep(0.5)
class Consumer(Process):
def __init__(self, queue: Queue, queue_result: Queue):
super().__init__()
self.__queue = queue
self.__queue_result = queue_result
def run(self):
while True:
if self.__queue.empty():
print('Queue is empty. Exit.')
break
else:
item = self.__queue.get()
res = item ** 2
print(res)
self.__queue_result.put(res)
time.sleep(1)
if __name__ == '__main__':
data = []
context = get_context('spawn')
queue = Queue(ctx=context)
queue_result = Queue(ctx=context)
producer_process = Producer(queue)
consumer_process = Consumer(queue, queue_result)
producer_process.start()
consumer_process.start()
producer_process.join()
consumer_process.join()
while not queue_result.empty():
data.append(queue_result.get())
print(data)
Сначала импортируем необходимые библиотеки. Затем создаем класс Producer, унаследованный от Process. В методе __init__ сохраняем очередь и переопределяем метод run. По умолчанию этот метод вызывает в дочернем процессе после инициализации. Если он не определен, то вызывается функция, переданная в параметре target. В текущей реализации, мы просто генерируем числа от 0 до 4 и кладем их в очередь queue.
Далее реализован класс Consumer, также от родительноского класса Process. В конструктор класса передаем уже две очереди. Одна, из которой будем вычитывать данные и вторая, в которую будем помещать результат расчетов. В методе run производим возведение в квадрат, перемещение результата в очередь результатов, а также проверка – пуста ли очередь. Если пуста, то завершаем работу консьюмера. Если этого не сделать, то выход из программы не произойдет и она будет работать постоянно.
Затем основная программа. Создаем экземпляры очередей и продьюсера с консьюмером. Стартуем оба процесса с помощью метода start() и указываем, что ждем завершения их работы методом join().
После завершения работы процессов обработки данных, вычитываем из очереди результатов значения queue_result в цикле while и выводим их на экран.
Результат работы программы представлен ниже.
Далее реализован класс Consumer, также от родительноского класса Process. В конструктор класса передаем уже две очереди. Одна, из которой будем вычитывать данные и вторая, в которую будем помещать результат расчетов. В методе run производим возведение в квадрат, перемещение результата в очередь результатов, а также проверка – пуста ли очередь. Если пуста, то завершаем работу консьюмера. Если этого не сделать, то выход из программы не произойдет и она будет работать постоянно.
Затем основная программа. Создаем экземпляры очередей и продьюсера с консьюмером. Стартуем оба процесса с помощью метода start() и указываем, что ждем завершения их работы методом join().
После завершения работы процессов обработки данных, вычитываем из очереди результатов значения queue_result в цикле while и выводим их на экран.
Результат работы программы представлен ниже.
0
1
4
9
16
Queue is empty. Exit.
[0, 1, 4, 9, 16]
1
4
9
16
Queue is empty. Exit.
[0, 1, 4, 9, 16]
Также разберем отдельно функцию get_context. Для каждой очереди необходимо указать контекст. Всего доступно три варианта:
Для упрощения работы с несколькими параллельными процессами можно использовать класс Pool из библиотеки multiprocessing. Примеры использования можно найти в нашей статье.
- ‘spawn’ (используется по умолчанию) – основной процесс запускает новый процесс интерпретатора python. Наиболее медленный метод из трех, но доступен на всех ОС
- ‘fork’ – использует системную команду fork для создания дочерних процессов. Быстрее чем spawn, но не доступен на Windows.
- ‘forkserver’ – задействуется специальный процесс-сервер, к которому обращается главный процесс программы, чтобы создать новый процесс. Ненужные ресурсы не копируется. Доступен на всех unix-based системах.
Для упрощения работы с несколькими параллельными процессами можно использовать класс Pool из библиотеки multiprocessing. Примеры использования можно найти в нашей статье.