Инструменты пользователя

Инструменты сайта


php:rabbitmq:amqplib

Различия

Здесь показаны различия между двумя версиями данной страницы.

Ссылка на это сравнение

Следующая версия
Предыдущая версия
php:rabbitmq:amqplib [2023/08/31 16:45]
werwolf создано
php:rabbitmq:amqplib [2023/08/31 16:50] (текущий)
werwolf
Строка 1: Строка 1:
-20 Декабря 2014 +===== сервер очередей RabbitMQ ​=====
- +
-[[:​categories:​php:​|php]] +
- +
-сервер очередей\\ RabbitMQ\\ +
  
 Иногда в веб-приложениях появляется необходимость выполнить сложные ресурсоемкие задачи,​ которые не могут быть умещены в коротком временном интервале HTTP запроса. В этом случае на помощь приходят очереди. Основная идея очередей -- избежать выполнения ресурсоемких задач непосредственно после отправки запроса. Вместо этого задача ставится в очередь для последующего выполнения в асинхронном режиме. Т.е. при получении запроса от клиента мы инкапсулируем задачу как сообщение и отправляем его в очередь,​ а уже обработчик очереди достает сообщения в порядке их следования и обрабатывает надлежащим образом. Забегая вперед,​ скажу, что возможен режим работы очередей,​ когда при наличии нескольких копий обработчика,​ следующая задач будет поступать на свободный обработчик. Таким образом достигается распараллеливание выполнения задач. Иногда в веб-приложениях появляется необходимость выполнить сложные ресурсоемкие задачи,​ которые не могут быть умещены в коротком временном интервале HTTP запроса. В этом случае на помощь приходят очереди. Основная идея очередей -- избежать выполнения ресурсоемких задач непосредственно после отправки запроса. Вместо этого задача ставится в очередь для последующего выполнения в асинхронном режиме. Т.е. при получении запроса от клиента мы инкапсулируем задачу как сообщение и отправляем его в очередь,​ а уже обработчик очереди достает сообщения в порядке их следования и обрабатывает надлежащим образом. Забегая вперед,​ скажу, что возможен режим работы очередей,​ когда при наличии нескольких копий обработчика,​ следующая задач будет поступать на свободный обработчик. Таким образом достигается распараллеливание выполнения задач.
Строка 27: Строка 23:
 Добавим следующию строку в файл /​etc/​apt/​sources.list Добавим следующию строку в файл /​etc/​apt/​sources.list
  
-|<​code>​ +<​code ​php>
-+
-</​code>​|<​code>​+
 deb http://​www.rabbitmq.com/​debian/​ testing main deb http://​www.rabbitmq.com/​debian/​ testing main
-</​code>​|+</​code>​
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-</​code>​|<​code>​+
 wget http://​www.rabbitmq.com/​rabbitmq-signing-key-public.asc wget http://​www.rabbitmq.com/​rabbitmq-signing-key-public.asc
 sudo apt-key add rabbitmq-signing-key-public.asc sudo apt-key add rabbitmq-signing-key-public.asc
 sudo apt-get update sudo apt-get update
 sudo apt-get install rabbitmq-server sudo apt-get install rabbitmq-server
-</​code>​|+</​code>​
  
 === Установка клиента === === Установка клиента ===
Строка 63: Строка 52:
 Т.е. схема работы следующая:​ Первое,​ что надо сделать,​ это установить соединение с сервером RabbitMQ. Соединение устанавливается командами Т.е. схема работы следующая:​ Первое,​ что надо сделать,​ это установить соединение с сервером RabbitMQ. Соединение устанавливается командами
  
-|<​code>​ +<​code ​php>
-+
-+
-</​code>​|<​code>​+
 $connection = new AMQPConnection($connection_params);​ $connection = new AMQPConnection($connection_params);​
 $connection->​connect();​ $connection->​connect();​
-</​code>​|+</​code>​
  
 где где
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-+
-+
-</​code>​|<​code>​+
 $connection_params = array( $connection_params = array(
   '​host'​ => '​localhost',​   '​host'​ => '​localhost',​
Строка 89: Строка 67:
   '​password'​ => '​guest'​   '​password'​ => '​guest'​
 ); );
-</​code>​|+</​code>​
  
 это дефолтные значения. Если достаточно дефолтного значения любого из этих параметров,​ то его можно опустить. И, напротив,​ если, к примеру,​ нужно подключиться к другой машине,​ в параметре host необходимо указать ее имя или ip адрес. это дефолтные значения. Если достаточно дефолтного значения любого из этих параметров,​ то его можно опустить. И, напротив,​ если, к примеру,​ нужно подключиться к другой машине,​ в параметре host необходимо указать ее имя или ip адрес.
Строка 95: Строка 73:
 Используя коннект можно получить объект для канала Используя коннект можно получить объект для канала
  
-|<​code>​ +<​code ​php>
-+
-</​code>​|<​code>​+
 $channel = new AMQPChannel($connection);​ $channel = new AMQPChannel($connection);​
-</​code>​|+</​code>​
  
 На основе полученного канала создаем обменник На основе полученного канала создаем обменник
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-</​code>​|<​code>​+
 $exchange = new AMQPExchange($channel);​ $exchange = new AMQPExchange($channel);​
 $exchange->​setName('​ex_hello'​);​ $exchange->​setName('​ex_hello'​);​
Строка 115: Строка 85:
 $exchange->​setFlags(AMQP_DURABLE);​ $exchange->​setFlags(AMQP_DURABLE);​
 $exchange->​declare();​ $exchange->​declare();​
-</​code>​|+</​code>​
  
 и, собственно,​ саму очередь и, собственно,​ саму очередь
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-</​code>​|<​code>​+
 $queue = new AMQPQueue($channel);​ $queue = new AMQPQueue($channel);​
 $queue->​setName('​hello'​);​ $queue->​setName('​hello'​);​
 $queue->​setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);​ $queue->​setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);​
 $queue->​declare();​ $queue->​declare();​
-</​code>​|+</​code>​
  
 Когда обменник и очередь готовы,​ их можно связать по ключу Когда обменник и очередь готовы,​ их можно связать по ключу
  
-|<​code>​ +<​code ​php>
-+
-</​code>​|<​code>​+
 $queue->​bind($exchange->​getName(),​ '​foo_key'​);​ $queue->​bind($exchange->​getName(),​ '​foo_key'​);​
-</​code>​|+</​code>​
  
 Объявлять очередь и связывать ее с обменником можно как на продюсере,​ так и на консьюмере. Все зависит от того, что первым будет запускаться. Если неизвестно,​ то, возможно следует объявить и там и там. При этом имена очередей должны совпадать. Если имена очередей совпадают,​ то количество объявлений не имеет значения. Очередь с определенным именем может быть только одна. Стоит отметить,​ что сообщение не может быть опубликовано напрямую в очередь,​ оно должно проходить через обменник. Собственно посредством обменника оно и публикуется $result = $exchange->​publish(json_encode(“Hello world!”), “foo_key”);​ Объявлять очередь и связывать ее с обменником можно как на продюсере,​ так и на консьюмере. Все зависит от того, что первым будет запускаться. Если неизвестно,​ то, возможно следует объявить и там и там. При этом имена очередей должны совпадать. Если имена очередей совпадают,​ то количество объявлений не имеет значения. Очередь с определенным именем может быть только одна. Стоит отметить,​ что сообщение не может быть опубликовано напрямую в очередь,​ оно должно проходить через обменник. Собственно посредством обменника оно и публикуется $result = $exchange->​publish(json_encode(“Hello world!”), “foo_key”);​
Строка 143: Строка 106:
 После того как сообщение отослано,​ коннект можно разорвать. После того как сообщение отослано,​ коннект можно разорвать.
  
-|<​code>​ +<​code ​php>
-+
-</​code>​|<​code>​+
 $connection->​disconnect();​ $connection->​disconnect();​
-</​code>​|+</​code>​
  
 Получатель также должен выполнить ту же последовательность -- приконнектиться к серверу сообщений;​ -- создать канал; -- объявить обменник;​ -- объявить очередь;​ -- связать очередь с обменником по ключу Последние два действия,​ как упоминалось выше, не обязательны. Теперь можно начать прослушивать очередь Получатель также должен выполнить ту же последовательность -- приконнектиться к серверу сообщений;​ -- создать канал; -- объявить обменник;​ -- объявить очередь;​ -- связать очередь с обменником по ключу Последние два действия,​ как упоминалось выше, не обязательны. Теперь можно начать прослушивать очередь
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-+
-</​code>​|<​code>​+
 while (true) { while (true) {
     if ($envelope = $queue->​get(AMQP_AUTOACK)) {     if ($envelope = $queue->​get(AMQP_AUTOACK)) {
Строка 165: Строка 119:
     }     }
 } }
-</​code>​|+</​code>​
  
 Здесь методу get в качетсве параметра передается константа ARMQ_AUTOACK,​ которая оповещает сервер сообщений о том, что данное сообщение получено. Это самый простой способ удалить сообщение из очереди. Однако в данном случае в случае неудачной обработки сообщения,​ вернуть повторно его в очередь нельзя. Здесь методу get в качетсве параметра передается константа ARMQ_AUTOACK,​ которая оповещает сервер сообщений о том, что данное сообщение получено. Это самый простой способ удалить сообщение из очереди. Однако в данном случае в случае неудачной обработки сообщения,​ вернуть повторно его в очередь нельзя.
Строка 173: Строка 127:
 send.php send.php
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-+
-+
-+
-+
-10 +
-11 +
-12 +
-13 +
-</​code>​|<​code>​+
 $connection = new AMQPConnection($connection_params);​ $connection = new AMQPConnection($connection_params);​
 $connection->​connect();​ $connection->​connect();​
Строка 201: Строка 141:
 $result = $exchange->​publish(json_encode("​Hello world!"​),​ "​foo_key"​);​ $result = $exchange->​publish(json_encode("​Hello world!"​),​ "​foo_key"​);​
 $connection->​disconnect();​ $connection->​disconnect();​
-</​code>​|+</​code>​
  
 receiver.php receiver.php
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-+
-+
-+
-+
-10 +
-11 +
-12 +
-13 +
-14 +
-15 +
-16 +
-17 +
-18 +
-19 +
-</​code>​|<​code>​+
 $connection = new AMQPConnection($connection_params);​ $connection = new AMQPConnection($connection_params);​
 $connection->​connect();​ $connection->​connect();​
Строка 245: Строка 165:
 } }
 $connection->​disconnect();​ $connection->​disconnect();​
-</​code>​|+</​code>​
  
 ==== Распределенные очереди ==== ==== Распределенные очереди ====
Строка 255: Строка 175:
 Некоторые задачи могут выполняться довольно долго. И неизвестно,​ что может произойти с сервером в этот момент:​ сервер может перезагрузиться,​ либо задача может зависнуть или завершится фатальной ошибкой. В первом туториале оповещение было отключено путем передачи параметра AMQP_AUTOACK в метод get(). В этом случае сообщения удаляются из памяти сразу после выполнения метода get и в случае ошибки,​ случившейся во время обработки,​ не вернутся в очередь. Чтобы избежать этого, не будем передавать константу AMQP_AUTOACK в метод get. Вместо этого по завершению обработки вызовем метод ack(), который уведомит брокер о том, что сообщение успешно обработано и его можно удалить из памяти. В противном случае RabbitMQ понимает,​ что сообщение не обратботано и перенаправляет его другому свободному консьюмеру. Однако здесь стоит отметить один важный момент. Перенаправленные сообщения не будут обрабатываться до того пока консьюмер не отконнектится и приконнектится заново к брокеру. Если необходимо заново обработать сообщение в рамках того же коннекта к серверу сообщений,​ то необходимо вызвать метод nack() с флагом AMQP_REQUEUE,​ который поставит неудачно обработанную задачу обратно в очередь и уведомит брокер о том, что эта задача должна быть вновь обработана. Некоторые задачи могут выполняться довольно долго. И неизвестно,​ что может произойти с сервером в этот момент:​ сервер может перезагрузиться,​ либо задача может зависнуть или завершится фатальной ошибкой. В первом туториале оповещение было отключено путем передачи параметра AMQP_AUTOACK в метод get(). В этом случае сообщения удаляются из памяти сразу после выполнения метода get и в случае ошибки,​ случившейся во время обработки,​ не вернутся в очередь. Чтобы избежать этого, не будем передавать константу AMQP_AUTOACK в метод get. Вместо этого по завершению обработки вызовем метод ack(), который уведомит брокер о том, что сообщение успешно обработано и его можно удалить из памяти. В противном случае RabbitMQ понимает,​ что сообщение не обратботано и перенаправляет его другому свободному консьюмеру. Однако здесь стоит отметить один важный момент. Перенаправленные сообщения не будут обрабатываться до того пока консьюмер не отконнектится и приконнектится заново к брокеру. Если необходимо заново обработать сообщение в рамках того же коннекта к серверу сообщений,​ то необходимо вызвать метод nack() с флагом AMQP_REQUEUE,​ который поставит неудачно обработанную задачу обратно в очередь и уведомит брокер о том, что эта задача должна быть вновь обработана.
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-+
-+
-+
-+
-10 +
-11 +
-12 +
-</​code>​|<​code>​+
 while (true) { while (true) {
     if ($evnelope = $queue->​get()) {     if ($evnelope = $queue->​get()) {
Строка 281: Строка 188:
     }     }
 } }
-</​code>​|+</​code>​
  
 Распростаненная ошибка -- при включенном оповещении не подтверждать корректно обработанные задачи(сообщения). В этом случае при каждом новом коннекте,​ все уже обработанные задачи будут поступать заново на обработку. Процесс будет выглядеть как беспорядочная повторная отпарка сообщений,​ что в конечном итоге приведет к переполнению памяти. Отследить такую ситуацию можно путем использования нативного инструмента сервера сообщений rabbitmqctl Распростаненная ошибка -- при включенном оповещении не подтверждать корректно обработанные задачи(сообщения). В этом случае при каждом новом коннекте,​ все уже обработанные задачи будут поступать заново на обработку. Процесс будет выглядеть как беспорядочная повторная отпарка сообщений,​ что в конечном итоге приведет к переполнению памяти. Отследить такую ситуацию можно путем использования нативного инструмента сервера сообщений rabbitmqctl
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-</​code>​|<​code>​+
 $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
 Listing queues ... Listing queues ...
 hello 0 0 hello 0 0
 ...done. ...done.
-</​code>​|+</​code>​
  
 === Жизнеспособность сообщений (durability) === === Жизнеспособность сообщений (durability) ===
Строка 301: Строка 203:
 В предыдущем параграфе мы рассмотрели как не потерять сообщение в очереди путем повторной отправки его в очередь. Тем не менее сообщение может быть потеряно в случае если сервер сообщений был неожиданно остановлен. Чтобы этого избежать,​ очередь должна быть создана с флагом AMQP_DURABLE. В предыдущем параграфе мы рассмотрели как не потерять сообщение в очереди путем повторной отправки его в очередь. Тем не менее сообщение может быть потеряно в случае если сервер сообщений был неожиданно остановлен. Чтобы этого избежать,​ очередь должна быть создана с флагом AMQP_DURABLE.
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-</​code>​|<​code>​+
 $queue = new AMQPQueue($channel);​ $queue = new AMQPQueue($channel);​
 $queue->​setName('​hello'​);​ $queue->​setName('​hello'​);​
 $queue->​setFlags(AMQP_DURABLE);​ $queue->​setFlags(AMQP_DURABLE);​
 $queue->​declare();​ $queue->​declare();​
-</​code>​|+</​code>​
  
 Если очередь ‘hello’ уже была объявлена,​ то данный код вызовет ошибку,​ поскольку один раз объявленную очередь нельзя объявить повторно с другими параметрами. Из этой ситуации есть два выхода,​ либо обнулить все очереди как сказано здесь, либо создать новую очередь с неиспользуемым именем. Посмотреть список очередей можно спопособом упомянутым в предыдущем параграфе. Установка флага AMQP_DURABLE не гарантирует стопроцентную сохранность сообщений в очереди. Несмотря на то, что таким спопосбом мы указываем RabbitMQ сохранять сообщения на диске, существует мертвая зона после получения соощения,​ когда оно уже в памяти,​ но еще не сохранено на диске. В этот момент,​ в случае не предвиденной ситуации,​ оно может быть утеряно из памяти. Для нашего простого примера таких гарантий достаточно,​ но если необходимо добиться высоких гарантий получения сообщения,​ то следует использовать транзакции. Если очередь ‘hello’ уже была объявлена,​ то данный код вызовет ошибку,​ поскольку один раз объявленную очередь нельзя объявить повторно с другими параметрами. Из этой ситуации есть два выхода,​ либо обнулить все очереди как сказано здесь, либо создать новую очередь с неиспользуемым именем. Посмотреть список очередей можно спопособом упомянутым в предыдущем параграфе. Установка флага AMQP_DURABLE не гарантирует стопроцентную сохранность сообщений в очереди. Несмотря на то, что таким спопосбом мы указываем RabbitMQ сохранять сообщения на диске, существует мертвая зона после получения соощения,​ когда оно уже в памяти,​ но еще не сохранено на диске. В этот момент,​ в случае не предвиденной ситуации,​ оно может быть утеряно из памяти. Для нашего простого примера таких гарантий достаточно,​ но если необходимо добиться высоких гарантий получения сообщения,​ то следует использовать транзакции.
Строка 319: Строка 216:
 Для примера распределения сообщений между очередями нам понадобится функция,​ имитирующая загруженность системы. Для этого мы используем обычный таймер Для примера распределения сообщений между очередями нам понадобится функция,​ имитирующая загруженность системы. Для этого мы используем обычный таймер
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-+
-</​code>​|<​code>​+
 function doWork($message) { function doWork($message) {
     $sleep_interval = rand(1, 5);     $sleep_interval = rand(1, 5);
Строка 333: Строка 223:
     return true;     return true;
 } }
-</​code>​|+</​code>​
  
 Полный код продюсера (send.php) будет выглядеть так Полный код продюсера (send.php) будет выглядеть так
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-+
-+
-+
-+
-10 +
-11 +
-12 +
-13 +
-14 +
-15 +
-16 +
-17 +
-18 +
-19 +
-20 +
-21 +
-22 +
-23 +
-24 +
-25 +
-26 +
-27 +
-28 +
-29 +
-30 +
-31 +
-32 +
-33 +
-</​code>​|<​code>​+
 $params = array( $params = array(
     '​host'​ => '​localhost',​     '​host'​ => '​localhost',​
Строка 405: Строка 261:
  
 $connection->​disconnect();​ $connection->​disconnect();​
-</​code>​|+</​code>​
  
 Консьюмер (receive.php) Консьюмер (receive.php)
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-+
-+
-+
-+
-10 +
-11 +
-12 +
-13 +
-14 +
-15 +
-16 +
-17 +
-18 +
-19 +
-20 +
-21 +
-22 +
-23 +
-24 +
-25 +
-26 +
-27 +
-28 +
-29 +
-30 +
-31 +
-32 +
-33 +
-34 +
-35 +
-36 +
-37 +
-</​code>​|<​code>​+
 $params = array( $params = array(
     '​host'​ => '​localhost',​     '​host'​ => '​localhost',​
Строка 485: Строка 303:
  
 $connection->​disconnect();​ $connection->​disconnect();​
-</​code>​|+</​code>​
  
 Обратите внимание,​ что очереди создаются с абсолютно идентичными параметрами. Как уже было сказано,​ повторное создание очереди с иными параметрами вызовет исключение. Также стоит отметить,​ что если вы уверены,​ что консьюмер будет запускаться первым,​ то создание очереди в продюсере не обязательно. Обратите внимание,​ что очереди создаются с абсолютно идентичными параметрами. Как уже было сказано,​ повторное создание очереди с иными параметрами вызовет исключение. Также стоит отметить,​ что если вы уверены,​ что консьюмер будет запускаться первым,​ то создание очереди в продюсере не обязательно.
Строка 493: Строка 311:
 Вывод в первом терминале Вывод в первом терминале
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-+
-+
-</​code>​|<​code>​+
 $ php receive.php $ php receive.php
 delivery tag: 1 delivery tag: 1
Строка 509: Строка 319:
 delivery tag: 3 delivery tag: 3
 Hello world!.... Hello world!....
-</​code>​|+</​code>​
  
 Вывод во втором терминале Вывод во втором терминале
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-+
-+
-+
-+
-10 +
-11 +
-</​code>​|<​code>​+
 $ php receive.php $ php receive.php
 delivery tag: 1 delivery tag: 1
Строка 537: Строка 335:
 delivery tag: 5 delivery tag: 5
 Hello world!. Hello world!.
-</​code>​|+</​code>​
  
 Как видно сообщения распределились по мере нагрузки консьюмера. Как видно сообщения распределились по мере нагрузки консьюмера.
Строка 554: Строка 352:
 В нашем примере будем использовть тип обменника fanout. В нашем примере будем использовть тип обменника fanout.
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-</​code>​|<​code>​+
 $exchange->​setName('​ex_hello'​);​ $exchange->​setName('​ex_hello'​);​
 $exchange->​setType(AMQP_EX_TYPE_FANOUT);​ $exchange->​setType(AMQP_EX_TYPE_FANOUT);​
 $exchange->​declare();​ $exchange->​declare();​
-</​code>​|+</​code>​
  
 [[http://​ajaxblog.ru/​images/​post/​rabbitmq-tutorial/​tut_03.png|{{:​images:​post:​rabbitmq-tutorial:​tut_03.png}}]]Для этой цели продюсер не создает именованную очередь. Консьюмер же, в свою очередь,​ создает анонимную очередь,​ в которую принимает сообщения продюсера. При таком подходе каждый консьюмер будет принимать все сообщения продюсера. [[http://​ajaxblog.ru/​images/​post/​rabbitmq-tutorial/​tut_03.png|{{:​images:​post:​rabbitmq-tutorial:​tut_03.png}}]]Для этой цели продюсер не создает именованную очередь. Консьюмер же, в свою очередь,​ создает анонимную очередь,​ в которую принимает сообщения продюсера. При таком подходе каждый консьюмер будет принимать все сообщения продюсера.
Строка 570: Строка 364:
 В предыдущем уроке у нас была необходимость рассылки сообщений в очереди с одинаковыми именами для возможности распределения сообщений между продюсерами и консьюмерами. Для достижения же текущей цели нам нужны выполнить две вещи. Во-первых,​ нам нужны очереди с различными именами. Во-вторых,​ созданные очереди должны автоматически удаляться после окончания работы скрипта. Для создания рандомного имени, можно воспользоваться одной из функций генерации хеша, к примеру sha1 или md5. Или же оставить эту задачу серверу сообщений. Если при объявлении очереди не устанавливать ей имя, то RabbitMQ сам задаст рандомное имя очереди. Для возможности автоматического удаления очереди,​ при ее создании нужно задать флаги AMQP_IFUNUSED,​ AMQP_AUTODELETE. В предыдущем уроке у нас была необходимость рассылки сообщений в очереди с одинаковыми именами для возможности распределения сообщений между продюсерами и консьюмерами. Для достижения же текущей цели нам нужны выполнить две вещи. Во-первых,​ нам нужны очереди с различными именами. Во-вторых,​ созданные очереди должны автоматически удаляться после окончания работы скрипта. Для создания рандомного имени, можно воспользоваться одной из функций генерации хеша, к примеру sha1 или md5. Или же оставить эту задачу серверу сообщений. Если при объявлении очереди не устанавливать ей имя, то RabbitMQ сам задаст рандомное имя очереди. Для возможности автоматического удаления очереди,​ при ее создании нужно задать флаги AMQP_IFUNUSED,​ AMQP_AUTODELETE.
  
-|<​code>​ +<​code ​php>
-+
-</​code>​|<​code>​+
 $queue->​setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);​ $queue->​setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);​
-</​code>​|+</​code>​
  
 === Связывание (bindings) === === Связывание (bindings) ===
Строка 580: Строка 372:
 Мы уже создали обменник с типо fanout и очередь. Теперь нужно сказать обменнику,​ что он должен публиковать сообщения имеено в эту очередь. Это отношение называется связыванием (binding) Мы уже создали обменник с типо fanout и очередь. Теперь нужно сказать обменнику,​ что он должен публиковать сообщения имеено в эту очередь. Это отношение называется связыванием (binding)
  
-|<​code>​ +<​code ​php>
-+
-</​code>​|<​code>​+
 $queue->​bind($exchange->​getName(),​ ''​);​ $queue->​bind($exchange->​getName(),​ ''​);​
-</​code>​|+</​code>​
  
 Здесь второй параметр -- ключ, по которому связывается обменник и очередь. В данном случае он может быть любой строкой,​ поскольку его значение игнорируется в случае,​ если обменник имеет тип fanout. Здесь второй параметр -- ключ, по которому связывается обменник и очередь. В данном случае он может быть любой строкой,​ поскольку его значение игнорируется в случае,​ если обменник имеет тип fanout.
Строка 594: Строка 384:
 send.php send.php
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-+
-+
-+
-+
-10 +
-11 +
-12 +
-13 +
-14 +
-15 +
-16 +
-17 +
-18 +
-19 +
-20 +
-21 +
-22 +
-23 +
-24 +
-25 +
-26 +
-27 +
-28 +
-29 +
-30 +
-31 +
-32 +
-33 +
-34 +
-35 +
-36 +
-37 +
-38 +
-39 +
-40 +
-41 +
-42 +
-43 +
-44 +
-45 +
-46 +
-47 +
-48 +
-49 +
-50 +
-51 +
-52 +
-53 +
-54 +
-55 +
-56 +
-57 +
-58 +
-59 +
-60 +
-61 +
-62 +
-63 +
-64 +
-65 +
-66 +
-67 +
-68 +
-69 +
-70 +
-</​code>​|<​code>​+
 $params = array( $params = array(
     '​host'​ => '​localhost',​     '​host'​ => '​localhost',​
Строка 736: Строка 455:
  
 $connection->​disconnect();​ $connection->​disconnect();​
-</​code>​|+</​code>​
  
 ==== Селективная рассылка ==== ==== Селективная рассылка ====
Строка 746: Строка 465:
 Связываение уже упоминалось в предыдущем уроке Связываение уже упоминалось в предыдущем уроке
  
-|<​code>​ +<​code ​php>
-+
-</​code>​|<​code>​+
 $queue->​bind($exchange->​getName(),​ ''​);​ $queue->​bind($exchange->​getName(),​ ''​);​
-</​code>​|+</​code>​
  
 Повторимся,​ оно нужно, чтобы сказать обменнику,​ что он должен публиковать сообщения имеено в эту очередь. В методе bind() имеется второй параметр -- ключ(routingKey),​ по которому связывается обменник и очередь. В данном уроке он будет играть основную роль. Стоит также напомнить,​ что ключ напрямую зависит от типа обменника. Так для обменника с типом fanout, он просто игнорируется. К примеру,​ если нужно связать обменник и очередь по ключу ‘failure_messages’ Повторимся,​ оно нужно, чтобы сказать обменнику,​ что он должен публиковать сообщения имеено в эту очередь. В методе bind() имеется второй параметр -- ключ(routingKey),​ по которому связывается обменник и очередь. В данном уроке он будет играть основную роль. Стоит также напомнить,​ что ключ напрямую зависит от типа обменника. Так для обменника с типом fanout, он просто игнорируется. К примеру,​ если нужно связать обменник и очередь по ключу ‘failure_messages’
  
-|<​code>​ +<​code ​php>
-+
-</​code>​|<​code>​+
 $queue->​bind($exchange->​getName(),​ '​failure_messages'​);​ $queue->​bind($exchange->​getName(),​ '​failure_messages'​);​
-</​code>​|+</​code>​
  
 === Прямое связывание (точка-точка) === === Прямое связывание (точка-точка) ===
Строка 774: Строка 489:
 Для отправки сообщений способом точка-точка обменник должен быть создан с типом direct, который сооветствует константе AMQP_EX_TYPE_DIRECT. Для отправки сообщений способом точка-точка обменник должен быть создан с типом direct, который сооветствует константе AMQP_EX_TYPE_DIRECT.
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-</​code>​|<​code>​+
 $exchange = new AMQPExchange($channel);​ $exchange = new AMQPExchange($channel);​
 $exchange->​setName('​logs'​);​ $exchange->​setName('​logs'​);​
 $exchange->​setType(AMQP_EX_TYPE_DIRECT);​ $exchange->​setType(AMQP_EX_TYPE_DIRECT);​
 $exchange->​declare();​ $exchange->​declare();​
-</​code>​|+</​code>​
  
 После чего возможна публикация сообщений по ключу После чего возможна публикация сообщений по ключу
  
-|<​code>​ +<​code ​php>
-+
-</​code>​|<​code>​+
 $exchange->​publish($message,​ '​notice'​);​ $exchange->​publish($message,​ '​notice'​);​
-</​code>​|+</​code>​
  
 Получение сообщений Получение сообщений ничем не отличается от предыдущего урока, за исключением того, что нам нужно связать обменник с очередью по каждому типу Получение сообщений Получение сообщений ничем не отличается от предыдущего урока, за исключением того, что нам нужно связать обменник с очередью по каждому типу
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-+
-</​code>​|<​code>​+
 $queue = new AMQPQueue($channel);​ $queue = new AMQPQueue($channel);​
 $queue->​declare();​ $queue->​declare();​
Строка 810: Строка 511:
     $queue->​bind($exchange->​getName(),​ $routingKey'​);​     $queue->​bind($exchange->​getName(),​ $routingKey'​);​
 } }
-</​code>​|+</​code>​
  
 === Все вместе === === Все вместе ===
Строка 818: Строка 519:
 Продюсер (send.php) Продюсер (send.php)
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-+
-+
-+
-+
-10 +
-11 +
-12 +
-13 +
-14 +
-15 +
-16 +
-17 +
-18 +
-19 +
-20 +
-21 +
-22 +
-23 +
-24 +
-25 +
-26 +
-27 +
-28 +
-29 +
-30 +
-31 +
-32 +
-33 +
-34 +
-35 +
-36 +
-37 +
-38 +
-39 +
-40 +
-41 +
-42 +
-43 +
-44 +
-45 +
-46 +
-47 +
-48 +
-49 +
-50 +
-51 +
-52 +
-53 +
-54 +
-55 +
-56 +
-57 +
-58 +
-59 +
-60 +
-61 +
-62 +
-63 +
-64 +
-65 +
-66 +
-67 +
-68 +
-69 +
-70 +
-71 +
-72 +
-73 +
-74 +
-75 +
-</​code>​|<​code>​+
 $params = array( $params = array(
     '​host'​ => '​localhost',​     '​host'​ => '​localhost',​
Строка 970: Строка 595:
  
 $connection->​disconnect();​ $connection->​disconnect();​
-</​code>​|+</​code>​
  
 ==== Рассылка по шаблону ==== ==== Рассылка по шаблону ====
Строка 1001: Строка 626:
 Для отправки сообщений по шаблону обменник должен быть создан с типом topic, который сооветствует константе AMQP_EX_TYPE_TOPIC. Для отправки сообщений по шаблону обменник должен быть создан с типом topic, который сооветствует константе AMQP_EX_TYPE_TOPIC.
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-</​code>​|<​code>​+
 $exchange = new AMQPExchange($channel);​ $exchange = new AMQPExchange($channel);​
 $exchange->​setName('​logs'​);​ $exchange->​setName('​logs'​);​
Строка 1015: Строка 635:
 После чего возможна публикация сообщений по ключу После чего возможна публикация сообщений по ключу
  
-|<​code>​ +<​code ​php>
-+
-</​code>​|<​code>​+
 $exchange->​publish($message,​ '​kern.notice'​);​ $exchange->​publish($message,​ '​kern.notice'​);​
-</​code>​|+</​code>​
  
 === Получение сообщений === === Получение сообщений ===
Строка 1025: Строка 643:
 Получение сообщений ничем не отличается от предыдущего урока Получение сообщений ничем не отличается от предыдущего урока
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-+
-</​code>​|<​code>​+
 $queue = new AMQPQueue($channel);​ $queue = new AMQPQueue($channel);​
 $queue->​declare();​ $queue->​declare();​
Строка 1039: Строка 650:
     $queue->​bind($exchange->​getName(),​ $routingKey'​);​     $queue->​bind($exchange->​getName(),​ $routingKey'​);​
 } }
-</​code>​|+</​code>​
  
 === Все вместе === === Все вместе ===
Строка 1047: Строка 658:
 Продюсер(send.php) Продюсер(send.php)
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-+
-+
-+
-+
-10 +
-11 +
-12 +
-13 +
-14 +
-15 +
-16 +
-17 +
-18 +
-19 +
-20 +
-21 +
-22 +
-23 +
-24 +
-25 +
-26 +
-27 +
-28 +
-29 +
-30 +
-31 +
-32 +
-33 +
-34 +
-35 +
-36 +
-37 +
-38 +
-39 +
-40 +
-41 +
-42 +
-43 +
-44 +
-45 +
-46 +
-47 +
-48 +
-49 +
-50 +
-51 +
-52 +
-53 +
-54 +
-55 +
-56 +
-57 +
-58 +
-59 +
-60 +
-61 +
-62 +
-63 +
-64 +
-65 +
-66 +
-67 +
-68 +
-69 +
-70 +
-71 +
-72 +
-73 +
-</​code>​|<​code>​+
 $params = array( $params = array(
     '​host'​ => '​localhost',​     '​host'​ => '​localhost',​
Строка 1195: Строка 732:
  
 $connection->​disconnect();​ $connection->​disconnect();​
-</​code>​|+</​code>​
  
 ==== Реализация RPC шаблона ==== ==== Реализация RPC шаблона ====
Строка 1203: Строка 740:
 В целом, реализация RPC посредством RabbitMQ довольно проста. Клиент отправляет сообщение,​ а сервере отвечает. Для обработки ответа сервера,​ необходимо создать callback очередь. Чтобы узнать какая callback очередь ожидает ответа,​ мы должны в запросе послать ее имя. Для этого на продюсере создается анонимная очередь и ее имя добавляется в параметры запроса В целом, реализация RPC посредством RabbitMQ довольно проста. Клиент отправляет сообщение,​ а сервере отвечает. Для обработки ответа сервера,​ необходимо создать callback очередь. Чтобы узнать какая callback очередь ожидает ответа,​ мы должны в запросе послать ее имя. Для этого на продюсере создается анонимная очередь и ее имя добавляется в параметры запроса
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-+
-+
-+
-+
-10 +
-11 +
-12 +
-13 +
-14 +
-</​code>​|<​code>​+
 $replyQueue = new AMQPQueue($channel);​ $replyQueue = new AMQPQueue($channel);​
 $replyQueue->​setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE | AMQP_EXCLUSIVE);​ $replyQueue->​setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE | AMQP_EXCLUSIVE);​
Строка 1233: Строка 755:
  
 // ... then code to read a response message from the callback_queue ... // ... then code to read a response message from the callback_queue ...
-</​code>​|+</​code>​
  
 Обратите внимание,​ что callback очередь создается с флагом AMQP_EXCLUSIVE,​ что означает,​ что только один консьюмер может слушать эту очередь. Обратите внимание,​ что callback очередь создается с флагом AMQP_EXCLUSIVE,​ что означает,​ что только один консьюмер может слушать эту очередь.
Строка 1253: Строка 775:
 Функция обработки сообщения на стороне сервера выглядит следующим образом Функция обработки сообщения на стороне сервера выглядит следующим образом
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-+
-+
-+
-+
-10 +
-11 +
-12 +
-13 +
-</​code>​|<​code>​+
 function doWork($message) function doWork($message)
 { {
Строка 1281: Строка 789:
     return $message;     return $message;
 } }
-</​code>​|+</​code>​
  
 Функция обработки сообщения на стороне клиента Функция обработки сообщения на стороне клиента
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-</​code>​|<​code>​+
 function getWork($message) function getWork($message)
 { {
     print_r($message);​     print_r($message);​
 } }
-</​code>​|+</​code>​
  
 Продюсер(send.php) Продюсер(send.php)
  
-|<​code>​ +<​code ​php>
-+
-+
-+
-+
-+
-+
-+
-+
-+
-10 +
-11 +
-12 +
-13 +
-14 +
-15 +
-16 +
-17 +
-18 +
-19 +
-20 +
-21 +
-22 +
-23 +
-24 +
-25 +
-26 +
-27 +
-28 +
-29 +
-30 +
-31 +
-32 +
-33 +
-34 +
-35 +
-36 +
-37 +
-38 +
-39 +
-40 +
-41 +
-42 +
-43 +
-44 +
-45 +
-46 +
-47 +
-48 +
-49 +
-50 +
-51 +
-52 +
-53 +
-54 +
-55 +
-56 +
-57 +
-58 +
-59 +
-60 +
-61 +
-62 +
-63 +
-64 +
-65 +
-66 +
-67 +
-68 +
-69 +
-70 +
-71 +
-72 +
-73 +
-74 +
-75 +
-76 +
-77 +
-78 +
-79 +
-80 +
-81 +
-82 +
-83 +
-84 +
-85 +
-86 +
-87 +
-88 +
-89 +
-90 +
-91 +
-92 +
-93 +
-94 +
-95 +
-96 +
-97 +
-98 +
-</​code>​|<​code>​+
 $params = array( $params = array(
     '​host'​ => '​localhost',​     '​host'​ => '​localhost',​
Строка 1497: Строка 901:
  
 $connection->​disconnect();​ $connection->​disconnect();​
-</​code>​|+</​code>​
php/rabbitmq/amqplib.1693489501.txt.gz · Последние изменения: 2023/08/31 16:45 — werwolf