Взаимодействие между процессами в python

Главная > Блог > Взаимодействие между процессами в python
Так как у процессов нет общей памяти, очень важно организовывать доставку параметров и результатов выполнения разных типов для корректной работы программы.

Для обеспечения этой потребности существуют различные модели обмена данными. Но мы остановимся на одной из них – очереди.

Очередь – это структура данных с типом FIFO (First in, First out), первым пришел, первым ушел. Очереди позволяют обмениваться сообщениями между группами процессов.

Задача “producer-consumer” описывает два процесса: один является поставщиком данных или задач, т.е. является их производителем, а второй получает их и обрабатывает, т.е. потребляет и удаляет их из очереди. Совместно они используют общий буфер для обмена сообщениями – очередь.

Рассмотрим на примере:
from multiprocessing import Process, get_context
from multiprocessing.queues import Queue
import time


class Producer(Process):
     def __init__(self, queue: Queue):
          super().__init__()
          self.__queue = queue

     def run(self):
          for msg in range(5):
               self.__queue.put(msg)
               time.sleep(0.5)


class Consumer(Process):
     def __init__(self, queue: Queue, queue_result: Queue):
          super().__init__()
          self.__queue = queue
          self.__queue_result = queue_result

     def run(self):
          while True:
               if self.__queue.empty():
                     print('Queue is empty. Exit.')
                     break
               else:
                    item = self.__queue.get()
                    res = item ** 2
                    print(res)
                    self.__queue_result.put(res)
                    time.sleep(1)


if __name__ == '__main__':
     data = []
     context = get_context('spawn')
     queue = Queue(ctx=context)
     queue_result = Queue(ctx=context)

     producer_process = Producer(queue)
     consumer_process = Consumer(queue, queue_result)
     producer_process.start()
     consumer_process.start()
     producer_process.join()
     consumer_process.join()

     while not queue_result.empty():
         data.append(queue_result.get())
         print(data)
Сначала импортируем необходимые библиотеки. Затем создаем класс Producer, унаследованный от Process. В методе __init__ сохраняем очередь и переопределяем метод run. По умолчанию этот метод вызывает в дочернем процессе после инициализации. Если он не определен, то вызывается функция, переданная в параметре target. В текущей реализации, мы просто генерируем числа от 0 до 4 и кладем их в очередь queue.

Далее реализован класс Consumer, также от родительноского класса Process. В конструктор класса передаем уже две очереди. Одна, из которой будем вычитывать данные и вторая, в которую будем помещать результат расчетов. В методе run производим возведение в квадрат, перемещение результата в очередь результатов, а также проверка – пуста ли очередь. Если пуста, то завершаем работу консьюмера. Если этого не сделать, то выход из программы не произойдет и она будет работать постоянно.

Затем основная программа. Создаем экземпляры очередей и продьюсера с консьюмером. Стартуем оба процесса с помощью метода start() и указываем, что ждем завершения их работы методом join().

После завершения работы процессов обработки данных, вычитываем из очереди результатов значения queue_result в цикле while и выводим их на экран.

Результат работы программы представлен ниже.
0
1
4
9
16
Queue is empty. Exit.
[0, 1, 4, 9, 16]
Также разберем отдельно функцию get_context. Для каждой очереди необходимо указать контекст. Всего доступно три варианта:

  • ‘spawn’ (используется по умолчанию) – основной процесс запускает новый процесс интерпретатора python. Наиболее медленный метод из трех, но доступен на всех ОС
  • ‘fork’ – использует системную команду fork для создания дочерних процессов. Быстрее чем spawn, но не доступен на Windows.
  • ‘forkserver’ – задействуется специальный процесс-сервер, к которому обращается главный процесс программы, чтобы создать новый процесс. Ненужные ресурсы не копируется. Доступен на всех unix-based системах.

Для упрощения работы с несколькими параллельными процессами можно использовать класс Pool из библиотеки multiprocessing. Примеры использования можно найти в нашей статье.