=====Multiprocessing===== Модуль **multiprocessing** был добавлен в Python версии 2.6. Изначально он был определен в PEP 371 Джесси Ноллером и Ричардом Одкерком. Модуль multiprocessing позволяет вам создавать процессы таким же образом, как при создании потоков при помощи модуля threading. Суть в том, что, в связи с тем, что мы теперь создаем процессы, вы можете обойти GIL (Global Interpreter Lock) и воспользоваться возможностью использования нескольких процессоров на компьютере. Пакет multiprocessing также включает ряд API, которых вообще нет в модуле threading. Например, есть очень удобный **класс Pool**, который вы можете использовать для параллельного выполнения функции между несколькими входами. Мы рассмотрим Pool немного позже. Мы начнем с класса **Process** модуля multiprocessing. ====Класс Process==== Класс Process очень похож на класс Thread модуля threading. Давайте попробуем создать несколько процессов, которые вызывают одну и ту же функцию, и посмотрим, как это сработает import os from multiprocessing import Process def doubler(number): """ Функция умножитель на два """ result = number * 2 proc = os.getpid() print('{0} doubled to {1} by process id: {2}'.format( number, result, proc)) if __name__ == '__main__': numbers = [5, 10, 15, 20, 25] procs = [] for index, number in enumerate(numbers): proc = Process(target=doubler, args=(number,)) procs.append(proc) proc.start() for proc in procs: proc.join() Для этого примера мы импортируем **Process** и создаем функцию **doubler**. Внутри функции, мы дублируем число, которое мы ей передали. Мы также используем модуль os, чтобы получить **ID нынешнего процесса**. Это скажет нам, какой именно процесс вызывает функцию. Далее, в нижнем блоке кода, мы создаем несколько Процессов и начинаем их. Самый последний цикл только вызывает метод **join()** для каждого из процессов, что говорит Python подождать, пока процесс завершится. Если вам нужно остановить процесс, вы можете вызвать метод **terminate()**. Когда вы запустите этот код, вы получите выдачу, на подобие этой: 5 doubled to 10 by process id: 10468 10 doubled to 20 by process id: 10469 15 doubled to 30 by process id: 10470 20 doubled to 40 by process id: 10471 25 doubled to 50 by process id: 10472 Все же иногда приятно иметь читабельное название процессов. К счастью, класс **Process** дает возможность вам получить доступ к названию вашего процесса. Давайте посмотрим: import os from multiprocessing import Process, current_process def doubler(number): result = number * 2 proc_name = current_process().name print('{0} doubled to {1} by: {2}'.format( number, result, proc_name)) if __name__ == '__main__': numbers = [5, 10, 15, 20, 25] procs = [] proc = Process(target=doubler, args=(5,)) for index, number in enumerate(numbers): proc = Process(target=doubler, args=(number,)) procs.append(proc) proc.start() proc = Process(target=doubler, name='Test', args=(2,)) proc.start() procs.append(proc) for proc in procs: proc.join() На этот раз мы импортируем кое-что дополнительно: **current_process**. Это примерно то же самое, что и **current_thread** модуля threading. Мы используем его для того, чтобы получить имя потока, который вызывает нашу функцию. Обратите внимание на то, что мы не указывали название первых пяти процессов. И только шестой мы назвали Test. Давайте посмотрим, какую выдачу мы получим: 5 doubled to 10 by: Process-2 10 doubled to 20 by: Process-3 15 doubled to 30 by: Process-4 20 doubled to 40 by: Process-5 25 doubled to 50 by: Process-6 2 doubled to 4 by: Test Выдача показывает, что модуль multiprocessing назначает номер каждому процессу, как часть его названия по умолчанию. Конечно, когда мы лично определяем название, модуль не будет добавлять число к нашему названию ====Замки (Locks)==== Модуль **multiprocessing** поддерживает замки так же, как и модуль threading. Все что вам нужно, это импортировать Lock, повесить его, сделать что-нибудь и снять его. Давайте посмотрим: from multiprocessing import Process, Lock def printer(item, lock): """ Выводим то что передали """ lock.acquire() try: print(item) finally: lock.release() if __name__ == '__main__': lock = Lock() items = ['tango', 'foxtrot', 10] for item in items: p = Process(target=printer, args=(item, lock)) p.start() Здесь мы создали простую функцию вывода, которая выводит все, что вы ей передаете. Чтобы не дать процессам **конфликтовать друг с другом**, мы используем объект **Lock**. Этот код зациклится над нашим списком трех объектов и создаст процесс для каждого из них. Каждый процесс будет вызывать нашу функцию, и передавать её одному из объектов. Так как мы используем замки, следующий процесс в строке **будет ждать, пока замок не снимается**, после чего он сможет продолжить. ====Логирование (Logging)==== Логирование процессов немного отличается от **логирования потоков**. Причина в том, что пакет logging не использует замки, предназначенные для процессов, так что в итоге вы можете получить результат, который состоит из кучи перемешанных между собой процессов. Давайте попробуем добавить базовый логгинг к предыдущему примеру. Вот код: import logging import multiprocessing from multiprocessing import Process, Lock def printer(item, lock): """ Выводим то что передали """ lock.acquire() try: print(item) finally: lock.release() if __name__ == '__main__': lock = Lock() items = ['tango', 'foxtrot', 10] multiprocessing.log_to_stderr() logger = multiprocessing.get_logger() logger.setLevel(logging.INFO) for item in items: p = Process(target=printer, args=(item, lock)) p.start() Простейший способ вести журнал, это отправить все на **stderr**. Мы можем сделать это, вызвав функцию **log_to_stderr()**. Далее мы вызываем функцию **get_logger** для получения доступа к логгеру и настраиваем его уровень логгинга на INFO. Остальная часть кода остается такой же, какой и была. Обратите внимание на то, что я не вызываю метод **join()** здесь. Вместо этого, поток parent (другими словами, ваш скрипт) вызовет join() лично. Когда вы сделаете это, вы получите что-то на подобие: [INFO/Process-1] child process calling self.run() tango [INFO/Process-1] process shutting down [INFO/Process-1] process exiting with exitcode 0 [INFO/Process-2] child process calling self.run() [INFO/MainProcess] process shutting down foxtrot [INFO/Process-2] process shutting down [INFO/Process-3] child process calling self.run() [INFO/Process-2] process exiting with exitcode 0 10 [INFO/MainProcess] calling join() for process Process-3 [INFO/Process-3] process shutting down [INFO/Process-3] process exiting with exitcode 0 [INFO/MainProcess] calling join() for process Process-2 Давайте пойдем дальше, и рассмотрим **класс Pool** поближе ====Класс Pool==== Класс Pool используется для показа пула рабочих процессов. Он включает в себя методы, которые позволяют вам разгружать задачи к рабочим процессам. Давайте посмотрим на простейший пример: from multiprocessing import Pool def doubler(number): return number * 2 if __name__ == '__main__': numbers = [5, 10, 20] pool = Pool(processes=3) print(pool.map(doubler, numbers)) Здесь мы создали экземпляр **Pool** и указали ему создать три рабочих процесса. Далее мы используем метод **map** для отображения функции для каждого процесса. Наконец мы выводим результат, что в нашем случае является списком: [10, 20, 40]. Вы также можете получить результат вашего процесса в пуле, используя метод **apply_async**: from multiprocessing import Pool def doubler(number): return number * 2 if __name__ == '__main__': pool = Pool(processes=3) result = pool.apply_async(doubler, (25,)) print(result.get(timeout=1)) Так мы можем запросить результат процесса. В этом суть работы функции **get**. Она пытается получить наши результаты. Обратите внимание на то, что мы также настроили обратный отсчет, на тот случай, если что-нибудь произойдет с вызываемой нами функцией. Мы не хотим, чтобы она была заблокирована. ====Связь между процессами==== Когда речь заходит о **связи между процессами**, модули нашего multiprocessing включают в себя два главных метода: Queue и Pipe. Работа Queue защищена как от процессов, так и от потоков. Давайте взглянем на достаточно простой пример: from multiprocessing import Process, Queue sentinel = -1 def creator(data, q): """ Creates data to be consumed and waits for the consumer to finish processing """ print('Creating data and putting it on the queue') for item in data: q.put(item) def my_consumer(q): """ Consumes some data and works on it In this case, all it does is double the input """ while True: data = q.get() print('data found to be processed: {}'.format(data)) processed = data * 2 print(processed) if data is sentinel: break if __name__ == '__main__': q = Queue() data = [5, 10, 13, -1] process_one = Process(target=creator, args=(data, q)) process_two = Process(target=my_consumer, args=(q,)) process_one.start() process_two.start() q.close() q.join_thread() process_one.join() process_two.join() Здесь нам только и нужно, что импортировать **Process и Queue**. Далее мы создаем две функции, одна для создания данных и добавления их в очередь, и вторая для использования данных и обработки их. Добавление данных в Queue выполняется при помощи метода put(), в то время как получение данных из Queue выполняется через метод get. Последний кусок кода только создает объект Queue и несколько экземпляров Process, после чего возвращает их. Обратите внимание на то, что мы вызываем join() в наших объектах process больше, чем Queue. ====Подведем итоги==== Здесь мы прошли через достаточно большое количество материала. Вы узнали много чего нового о модуле **multiprocessing** для направления обычных функций, связи между процессами при помощи Queue, наименований потоков и многого другого. Разумеется, в документации Python предоставлено намного больше развернутой информации, которую я даже не начинал затрагивать в данной статье, так что настоятельно рекомендую с ней ознакомиться. Тем не менее, вы все-таки узнали много чего о том, как усилить мощность обработки вашего компьютера при помощи Python!