=====Multiprocessing=====
Модуль **multiprocessing** был добавлен в Python версии 2.6. Изначально он был определен в PEP 371 Джесси Ноллером и Ричардом Одкерком. Модуль multiprocessing позволяет вам создавать процессы таким же образом, как при создании потоков при помощи модуля threading. Суть в том, что, в связи с тем, что мы теперь создаем процессы, вы можете обойти GIL (Global Interpreter Lock) и воспользоваться возможностью использования нескольких процессоров на компьютере. Пакет multiprocessing также включает ряд API, которых вообще нет в модуле threading. Например, есть очень удобный **класс Pool**, который вы можете использовать для параллельного выполнения функции между несколькими входами. Мы рассмотрим Pool немного позже. Мы начнем с класса **Process** модуля multiprocessing.
====Класс Process====
Класс Process очень похож на класс Thread модуля threading. Давайте попробуем создать несколько процессов, которые вызывают одну и ту же функцию, и посмотрим, как это сработает
import os
from multiprocessing import Process
def doubler(number):
"""
Функция умножитель на два
"""
result = number * 2
proc = os.getpid()
print('{0} doubled to {1} by process id: {2}'.format(
number, result, proc))
if __name__ == '__main__':
numbers = [5, 10, 15, 20, 25]
procs = []
for index, number in enumerate(numbers):
proc = Process(target=doubler, args=(number,))
procs.append(proc)
proc.start()
for proc in procs:
proc.join()
Для этого примера мы импортируем **Process** и создаем функцию **doubler**. Внутри функции, мы дублируем число, которое мы ей передали. Мы также используем модуль os, чтобы получить **ID нынешнего процесса**. Это скажет нам, какой именно процесс вызывает функцию. Далее, в нижнем блоке кода, мы создаем несколько Процессов и начинаем их.
Самый последний цикл только вызывает метод **join()** для каждого из процессов, что говорит Python подождать, пока процесс завершится. Если вам нужно остановить процесс, вы можете вызвать метод **terminate()**. Когда вы запустите этот код, вы получите выдачу, на подобие этой:
5 doubled to 10 by process id: 10468
10 doubled to 20 by process id: 10469
15 doubled to 30 by process id: 10470
20 doubled to 40 by process id: 10471
25 doubled to 50 by process id: 10472
Все же иногда приятно иметь читабельное название процессов. К счастью, класс **Process** дает возможность вам получить доступ к названию вашего процесса. Давайте посмотрим:
import os
from multiprocessing import Process, current_process
def doubler(number):
result = number * 2
proc_name = current_process().name
print('{0} doubled to {1} by: {2}'.format(
number, result, proc_name))
if __name__ == '__main__':
numbers = [5, 10, 15, 20, 25]
procs = []
proc = Process(target=doubler, args=(5,))
for index, number in enumerate(numbers):
proc = Process(target=doubler, args=(number,))
procs.append(proc)
proc.start()
proc = Process(target=doubler, name='Test', args=(2,))
proc.start()
procs.append(proc)
for proc in procs:
proc.join()
На этот раз мы импортируем кое-что дополнительно: **current_process**. Это примерно то же самое, что и **current_thread** модуля threading. Мы используем его для того, чтобы получить имя потока, который вызывает нашу функцию. Обратите внимание на то, что мы не указывали название первых пяти процессов. И только шестой мы назвали Test. Давайте посмотрим, какую выдачу мы получим:
5 doubled to 10 by: Process-2
10 doubled to 20 by: Process-3
15 doubled to 30 by: Process-4
20 doubled to 40 by: Process-5
25 doubled to 50 by: Process-6
2 doubled to 4 by: Test
Выдача показывает, что модуль multiprocessing назначает номер каждому процессу, как часть его названия по умолчанию. Конечно, когда мы лично определяем название, модуль не будет добавлять число к нашему названию
====Замки (Locks)====
Модуль **multiprocessing** поддерживает замки так же, как и модуль threading. Все что вам нужно, это импортировать Lock, повесить его, сделать что-нибудь и снять его. Давайте посмотрим:
from multiprocessing import Process, Lock
def printer(item, lock):
"""
Выводим то что передали
"""
lock.acquire()
try:
print(item)
finally:
lock.release()
if __name__ == '__main__':
lock = Lock()
items = ['tango', 'foxtrot', 10]
for item in items:
p = Process(target=printer, args=(item, lock))
p.start()
Здесь мы создали простую функцию вывода, которая выводит все, что вы ей передаете. Чтобы не дать процессам **конфликтовать друг с другом**, мы используем объект **Lock**. Этот код зациклится над нашим списком трех объектов и создаст процесс для каждого из них. Каждый процесс будет вызывать нашу функцию, и передавать её одному из объектов. Так как мы используем замки, следующий процесс в строке **будет ждать, пока замок не снимается**, после чего он сможет продолжить.
====Логирование (Logging)====
Логирование процессов немного отличается от **логирования потоков**. Причина в том, что пакет logging не использует замки, предназначенные для процессов, так что в итоге вы можете получить результат, который состоит из кучи перемешанных между собой процессов. Давайте попробуем добавить базовый логгинг к предыдущему примеру. Вот код:
import logging
import multiprocessing
from multiprocessing import Process, Lock
def printer(item, lock):
"""
Выводим то что передали
"""
lock.acquire()
try:
print(item)
finally:
lock.release()
if __name__ == '__main__':
lock = Lock()
items = ['tango', 'foxtrot', 10]
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
for item in items:
p = Process(target=printer, args=(item, lock))
p.start()
Простейший способ вести журнал, это отправить все на **stderr**. Мы можем сделать это, вызвав функцию **log_to_stderr()**. Далее мы вызываем функцию **get_logger** для получения доступа к логгеру и настраиваем его уровень логгинга на INFO. Остальная часть кода остается такой же, какой и была. Обратите внимание на то, что я не вызываю метод **join()** здесь. Вместо этого, поток parent (другими словами, ваш скрипт) вызовет join() лично. Когда вы сделаете это, вы получите что-то на подобие:
[INFO/Process-1] child process calling self.run()
tango
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/Process-2] child process calling self.run()
[INFO/MainProcess] process shutting down
foxtrot
[INFO/Process-2] process shutting down
[INFO/Process-3] child process calling self.run()
[INFO/Process-2] process exiting with exitcode 0
10
[INFO/MainProcess] calling join() for process Process-3
[INFO/Process-3] process shutting down
[INFO/Process-3] process exiting with exitcode 0
[INFO/MainProcess] calling join() for process Process-2
Давайте пойдем дальше, и рассмотрим **класс Pool** поближе
====Класс Pool====
Класс Pool используется для показа пула рабочих процессов. Он включает в себя методы, которые позволяют вам разгружать задачи к рабочим процессам. Давайте посмотрим на простейший пример:
from multiprocessing import Pool
def doubler(number):
return number * 2
if __name__ == '__main__':
numbers = [5, 10, 20]
pool = Pool(processes=3)
print(pool.map(doubler, numbers))
Здесь мы создали экземпляр **Pool** и указали ему создать три рабочих процесса. Далее мы используем метод **map** для отображения функции для каждого процесса. Наконец мы выводим результат, что в нашем случае является списком: [10, 20, 40]. Вы также можете получить результат вашего процесса в пуле, используя метод **apply_async**:
from multiprocessing import Pool
def doubler(number):
return number * 2
if __name__ == '__main__':
pool = Pool(processes=3)
result = pool.apply_async(doubler, (25,))
print(result.get(timeout=1))
Так мы можем запросить результат процесса. В этом суть работы функции **get**. Она пытается получить наши результаты. Обратите внимание на то, что мы также настроили обратный отсчет, на тот случай, если что-нибудь произойдет с вызываемой нами функцией. Мы не хотим, чтобы она была заблокирована.
====Связь между процессами====
Когда речь заходит о **связи между процессами**, модули нашего multiprocessing включают в себя два главных метода: Queue и Pipe. Работа Queue защищена как от процессов, так и от потоков. Давайте взглянем на достаточно простой пример:
from multiprocessing import Process, Queue
sentinel = -1
def creator(data, q):
"""
Creates data to be consumed and waits for the consumer
to finish processing
"""
print('Creating data and putting it on the queue')
for item in data:
q.put(item)
def my_consumer(q):
"""
Consumes some data and works on it
In this case, all it does is double the input
"""
while True:
data = q.get()
print('data found to be processed: {}'.format(data))
processed = data * 2
print(processed)
if data is sentinel:
break
if __name__ == '__main__':
q = Queue()
data = [5, 10, 13, -1]
process_one = Process(target=creator, args=(data, q))
process_two = Process(target=my_consumer, args=(q,))
process_one.start()
process_two.start()
q.close()
q.join_thread()
process_one.join()
process_two.join()
Здесь нам только и нужно, что импортировать **Process и Queue**. Далее мы создаем две функции, одна для создания данных и добавления их в очередь, и вторая для использования данных и обработки их. Добавление данных в Queue выполняется при помощи метода put(), в то время как получение данных из Queue выполняется через метод get. Последний кусок кода только создает объект Queue и несколько экземпляров Process, после чего возвращает их. Обратите внимание на то, что мы вызываем join() в наших объектах process больше, чем Queue.
====Подведем итоги====
Здесь мы прошли через достаточно большое количество материала. Вы узнали много чего нового о модуле **multiprocessing** для направления обычных функций, связи между процессами при помощи Queue, наименований потоков и многого другого. Разумеется, в документации Python предоставлено намного больше развернутой информации, которую я даже не начинал затрагивать в данной статье, так что настоятельно рекомендую с ней ознакомиться. Тем не менее, вы все-таки узнали много чего о том, как усилить мощность обработки вашего компьютера при помощи Python!