Инструменты пользователя

Инструменты сайта


php:rabbitmq:amqplib

Различия

Здесь показаны различия между двумя версиями данной страницы.

Ссылка на это сравнение

Предыдущая версия справа и слева Предыдущая версия
Следующая версия
Предыдущая версия
php:rabbitmq:amqplib [2023/08/31 16:49]
werwolf
php:rabbitmq:amqplib [2023/08/31 16:50] (текущий)
werwolf
Строка 1: Строка 1:
-20 Декабря 2014 +===== сервер очередей RabbitMQ ​=====
- +
-[[:​categories:​php:​|php]] +
- +
-сервер очередей\\ RabbitMQ\\ +
  
 Иногда в веб-приложениях появляется необходимость выполнить сложные ресурсоемкие задачи,​ которые не могут быть умещены в коротком временном интервале HTTP запроса. В этом случае на помощь приходят очереди. Основная идея очередей -- избежать выполнения ресурсоемких задач непосредственно после отправки запроса. Вместо этого задача ставится в очередь для последующего выполнения в асинхронном режиме. Т.е. при получении запроса от клиента мы инкапсулируем задачу как сообщение и отправляем его в очередь,​ а уже обработчик очереди достает сообщения в порядке их следования и обрабатывает надлежащим образом. Забегая вперед,​ скажу, что возможен режим работы очередей,​ когда при наличии нескольких копий обработчика,​ следующая задач будет поступать на свободный обработчик. Таким образом достигается распараллеливание выполнения задач. Иногда в веб-приложениях появляется необходимость выполнить сложные ресурсоемкие задачи,​ которые не могут быть умещены в коротком временном интервале HTTP запроса. В этом случае на помощь приходят очереди. Основная идея очередей -- избежать выполнения ресурсоемких задач непосредственно после отправки запроса. Вместо этого задача ставится в очередь для последующего выполнения в асинхронном режиме. Т.е. при получении запроса от клиента мы инкапсулируем задачу как сообщение и отправляем его в очередь,​ а уже обработчик очереди достает сообщения в порядке их следования и обрабатывает надлежащим образом. Забегая вперед,​ скажу, что возможен режим работы очередей,​ когда при наличии нескольких копий обработчика,​ следующая задач будет поступать на свободный обработчик. Таким образом достигается распараллеливание выполнения задач.
Строка 27: Строка 23:
 Добавим следующию строку в файл /​etc/​apt/​sources.list Добавим следующию строку в файл /​etc/​apt/​sources.list
  
-<​code>​+<​code ​php>
 deb http://​www.rabbitmq.com/​debian/​ testing main deb http://​www.rabbitmq.com/​debian/​ testing main
 </​code>​ </​code>​
  
-<​code>​+<​code ​php>
 wget http://​www.rabbitmq.com/​rabbitmq-signing-key-public.asc wget http://​www.rabbitmq.com/​rabbitmq-signing-key-public.asc
 sudo apt-key add rabbitmq-signing-key-public.asc sudo apt-key add rabbitmq-signing-key-public.asc
Строка 56: Строка 52:
 Т.е. схема работы следующая:​ Первое,​ что надо сделать,​ это установить соединение с сервером RabbitMQ. Соединение устанавливается командами Т.е. схема работы следующая:​ Первое,​ что надо сделать,​ это установить соединение с сервером RabbitMQ. Соединение устанавливается командами
  
-<​code>​+<​code ​php>
 $connection = new AMQPConnection($connection_params);​ $connection = new AMQPConnection($connection_params);​
 $connection->​connect();​ $connection->​connect();​
Строка 63: Строка 59:
 где где
  
-<​code>​+<​code ​php>
 $connection_params = array( $connection_params = array(
   '​host'​ => '​localhost',​   '​host'​ => '​localhost',​
Строка 77: Строка 73:
 Используя коннект можно получить объект для канала Используя коннект можно получить объект для канала
  
-<​code>​+<​code ​php>
 $channel = new AMQPChannel($connection);​ $channel = new AMQPChannel($connection);​
 </​code>​ </​code>​
Строка 83: Строка 79:
 На основе полученного канала создаем обменник На основе полученного канала создаем обменник
  
-<​code>​+<​code ​php>
 $exchange = new AMQPExchange($channel);​ $exchange = new AMQPExchange($channel);​
 $exchange->​setName('​ex_hello'​);​ $exchange->​setName('​ex_hello'​);​
Строка 93: Строка 89:
 и, собственно,​ саму очередь и, собственно,​ саму очередь
  
-<​code>​+<​code ​php>
 $queue = new AMQPQueue($channel);​ $queue = new AMQPQueue($channel);​
 $queue->​setName('​hello'​);​ $queue->​setName('​hello'​);​
Строка 102: Строка 98:
 Когда обменник и очередь готовы,​ их можно связать по ключу Когда обменник и очередь готовы,​ их можно связать по ключу
  
-<​code>​+<​code ​php>
 $queue->​bind($exchange->​getName(),​ '​foo_key'​);​ $queue->​bind($exchange->​getName(),​ '​foo_key'​);​
 </​code>​ </​code>​
Строка 110: Строка 106:
 После того как сообщение отослано,​ коннект можно разорвать. После того как сообщение отослано,​ коннект можно разорвать.
  
-<​code>​+<​code ​php>
 $connection->​disconnect();​ $connection->​disconnect();​
 </​code>​ </​code>​
Строка 116: Строка 112:
 Получатель также должен выполнить ту же последовательность -- приконнектиться к серверу сообщений;​ -- создать канал; -- объявить обменник;​ -- объявить очередь;​ -- связать очередь с обменником по ключу Последние два действия,​ как упоминалось выше, не обязательны. Теперь можно начать прослушивать очередь Получатель также должен выполнить ту же последовательность -- приконнектиться к серверу сообщений;​ -- создать канал; -- объявить обменник;​ -- объявить очередь;​ -- связать очередь с обменником по ключу Последние два действия,​ как упоминалось выше, не обязательны. Теперь можно начать прослушивать очередь
  
-<​code>​+<​code ​php>
 while (true) { while (true) {
     if ($envelope = $queue->​get(AMQP_AUTOACK)) {     if ($envelope = $queue->​get(AMQP_AUTOACK)) {
Строка 131: Строка 127:
 send.php send.php
  
-<​code>​+<​code ​php>
 $connection = new AMQPConnection($connection_params);​ $connection = new AMQPConnection($connection_params);​
 $connection->​connect();​ $connection->​connect();​
Строка 149: Строка 145:
 receiver.php receiver.php
  
-<​code>​+<​code ​php>
 $connection = new AMQPConnection($connection_params);​ $connection = new AMQPConnection($connection_params);​
 $connection->​connect();​ $connection->​connect();​
Строка 179: Строка 175:
 Некоторые задачи могут выполняться довольно долго. И неизвестно,​ что может произойти с сервером в этот момент:​ сервер может перезагрузиться,​ либо задача может зависнуть или завершится фатальной ошибкой. В первом туториале оповещение было отключено путем передачи параметра AMQP_AUTOACK в метод get(). В этом случае сообщения удаляются из памяти сразу после выполнения метода get и в случае ошибки,​ случившейся во время обработки,​ не вернутся в очередь. Чтобы избежать этого, не будем передавать константу AMQP_AUTOACK в метод get. Вместо этого по завершению обработки вызовем метод ack(), который уведомит брокер о том, что сообщение успешно обработано и его можно удалить из памяти. В противном случае RabbitMQ понимает,​ что сообщение не обратботано и перенаправляет его другому свободному консьюмеру. Однако здесь стоит отметить один важный момент. Перенаправленные сообщения не будут обрабатываться до того пока консьюмер не отконнектится и приконнектится заново к брокеру. Если необходимо заново обработать сообщение в рамках того же коннекта к серверу сообщений,​ то необходимо вызвать метод nack() с флагом AMQP_REQUEUE,​ который поставит неудачно обработанную задачу обратно в очередь и уведомит брокер о том, что эта задача должна быть вновь обработана. Некоторые задачи могут выполняться довольно долго. И неизвестно,​ что может произойти с сервером в этот момент:​ сервер может перезагрузиться,​ либо задача может зависнуть или завершится фатальной ошибкой. В первом туториале оповещение было отключено путем передачи параметра AMQP_AUTOACK в метод get(). В этом случае сообщения удаляются из памяти сразу после выполнения метода get и в случае ошибки,​ случившейся во время обработки,​ не вернутся в очередь. Чтобы избежать этого, не будем передавать константу AMQP_AUTOACK в метод get. Вместо этого по завершению обработки вызовем метод ack(), который уведомит брокер о том, что сообщение успешно обработано и его можно удалить из памяти. В противном случае RabbitMQ понимает,​ что сообщение не обратботано и перенаправляет его другому свободному консьюмеру. Однако здесь стоит отметить один важный момент. Перенаправленные сообщения не будут обрабатываться до того пока консьюмер не отконнектится и приконнектится заново к брокеру. Если необходимо заново обработать сообщение в рамках того же коннекта к серверу сообщений,​ то необходимо вызвать метод nack() с флагом AMQP_REQUEUE,​ который поставит неудачно обработанную задачу обратно в очередь и уведомит брокер о том, что эта задача должна быть вновь обработана.
  
-<​code>​+<​code ​php>
 while (true) { while (true) {
     if ($evnelope = $queue->​get()) {     if ($evnelope = $queue->​get()) {
Строка 196: Строка 192:
 Распростаненная ошибка -- при включенном оповещении не подтверждать корректно обработанные задачи(сообщения). В этом случае при каждом новом коннекте,​ все уже обработанные задачи будут поступать заново на обработку. Процесс будет выглядеть как беспорядочная повторная отпарка сообщений,​ что в конечном итоге приведет к переполнению памяти. Отследить такую ситуацию можно путем использования нативного инструмента сервера сообщений rabbitmqctl Распростаненная ошибка -- при включенном оповещении не подтверждать корректно обработанные задачи(сообщения). В этом случае при каждом новом коннекте,​ все уже обработанные задачи будут поступать заново на обработку. Процесс будет выглядеть как беспорядочная повторная отпарка сообщений,​ что в конечном итоге приведет к переполнению памяти. Отследить такую ситуацию можно путем использования нативного инструмента сервера сообщений rabbitmqctl
  
-<​code>​+<​code ​php>
 $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
 Listing queues ... Listing queues ...
Строка 207: Строка 203:
 В предыдущем параграфе мы рассмотрели как не потерять сообщение в очереди путем повторной отправки его в очередь. Тем не менее сообщение может быть потеряно в случае если сервер сообщений был неожиданно остановлен. Чтобы этого избежать,​ очередь должна быть создана с флагом AMQP_DURABLE. В предыдущем параграфе мы рассмотрели как не потерять сообщение в очереди путем повторной отправки его в очередь. Тем не менее сообщение может быть потеряно в случае если сервер сообщений был неожиданно остановлен. Чтобы этого избежать,​ очередь должна быть создана с флагом AMQP_DURABLE.
  
-<​code>​+<​code ​php>
 $queue = new AMQPQueue($channel);​ $queue = new AMQPQueue($channel);​
 $queue->​setName('​hello'​);​ $queue->​setName('​hello'​);​
Строка 220: Строка 216:
 Для примера распределения сообщений между очередями нам понадобится функция,​ имитирующая загруженность системы. Для этого мы используем обычный таймер Для примера распределения сообщений между очередями нам понадобится функция,​ имитирующая загруженность системы. Для этого мы используем обычный таймер
  
-<​code>​+<​code ​php>
 function doWork($message) { function doWork($message) {
     $sleep_interval = rand(1, 5);     $sleep_interval = rand(1, 5);
Строка 231: Строка 227:
 Полный код продюсера (send.php) будет выглядеть так Полный код продюсера (send.php) будет выглядеть так
  
-<​code>​+<​code ​php>
 $params = array( $params = array(
     '​host'​ => '​localhost',​     '​host'​ => '​localhost',​
Строка 269: Строка 265:
 Консьюмер (receive.php) Консьюмер (receive.php)
  
-<​code>​+<​code ​php>
 $params = array( $params = array(
     '​host'​ => '​localhost',​     '​host'​ => '​localhost',​
Строка 315: Строка 311:
 Вывод в первом терминале Вывод в первом терминале
  
-<​code>​+<​code ​php>
 $ php receive.php $ php receive.php
 delivery tag: 1 delivery tag: 1
Строка 327: Строка 323:
 Вывод во втором терминале Вывод во втором терминале
  
-<​code>​+<​code ​php>
 $ php receive.php $ php receive.php
 delivery tag: 1 delivery tag: 1
Строка 356: Строка 352:
 В нашем примере будем использовть тип обменника fanout. В нашем примере будем использовть тип обменника fanout.
  
-<​code>​+<​code ​php>
 $exchange->​setName('​ex_hello'​);​ $exchange->​setName('​ex_hello'​);​
 $exchange->​setType(AMQP_EX_TYPE_FANOUT);​ $exchange->​setType(AMQP_EX_TYPE_FANOUT);​
Строка 368: Строка 364:
 В предыдущем уроке у нас была необходимость рассылки сообщений в очереди с одинаковыми именами для возможности распределения сообщений между продюсерами и консьюмерами. Для достижения же текущей цели нам нужны выполнить две вещи. Во-первых,​ нам нужны очереди с различными именами. Во-вторых,​ созданные очереди должны автоматически удаляться после окончания работы скрипта. Для создания рандомного имени, можно воспользоваться одной из функций генерации хеша, к примеру sha1 или md5. Или же оставить эту задачу серверу сообщений. Если при объявлении очереди не устанавливать ей имя, то RabbitMQ сам задаст рандомное имя очереди. Для возможности автоматического удаления очереди,​ при ее создании нужно задать флаги AMQP_IFUNUSED,​ AMQP_AUTODELETE. В предыдущем уроке у нас была необходимость рассылки сообщений в очереди с одинаковыми именами для возможности распределения сообщений между продюсерами и консьюмерами. Для достижения же текущей цели нам нужны выполнить две вещи. Во-первых,​ нам нужны очереди с различными именами. Во-вторых,​ созданные очереди должны автоматически удаляться после окончания работы скрипта. Для создания рандомного имени, можно воспользоваться одной из функций генерации хеша, к примеру sha1 или md5. Или же оставить эту задачу серверу сообщений. Если при объявлении очереди не устанавливать ей имя, то RabbitMQ сам задаст рандомное имя очереди. Для возможности автоматического удаления очереди,​ при ее создании нужно задать флаги AMQP_IFUNUSED,​ AMQP_AUTODELETE.
  
-<​code>​+<​code ​php>
 $queue->​setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);​ $queue->​setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);​
 </​code>​ </​code>​
Строка 376: Строка 372:
 Мы уже создали обменник с типо fanout и очередь. Теперь нужно сказать обменнику,​ что он должен публиковать сообщения имеено в эту очередь. Это отношение называется связыванием (binding) Мы уже создали обменник с типо fanout и очередь. Теперь нужно сказать обменнику,​ что он должен публиковать сообщения имеено в эту очередь. Это отношение называется связыванием (binding)
  
-<​code>​+<​code ​php>
 $queue->​bind($exchange->​getName(),​ ''​);​ $queue->​bind($exchange->​getName(),​ ''​);​
 </​code>​ </​code>​
Строка 388: Строка 384:
 send.php send.php
  
-<​code>​+<​code ​php>
 $params = array( $params = array(
     '​host'​ => '​localhost',​     '​host'​ => '​localhost',​
Строка 469: Строка 465:
 Связываение уже упоминалось в предыдущем уроке Связываение уже упоминалось в предыдущем уроке
  
-<​code>​+<​code ​php>
 $queue->​bind($exchange->​getName(),​ ''​);​ $queue->​bind($exchange->​getName(),​ ''​);​
 </​code>​ </​code>​
Строка 475: Строка 471:
 Повторимся,​ оно нужно, чтобы сказать обменнику,​ что он должен публиковать сообщения имеено в эту очередь. В методе bind() имеется второй параметр -- ключ(routingKey),​ по которому связывается обменник и очередь. В данном уроке он будет играть основную роль. Стоит также напомнить,​ что ключ напрямую зависит от типа обменника. Так для обменника с типом fanout, он просто игнорируется. К примеру,​ если нужно связать обменник и очередь по ключу ‘failure_messages’ Повторимся,​ оно нужно, чтобы сказать обменнику,​ что он должен публиковать сообщения имеено в эту очередь. В методе bind() имеется второй параметр -- ключ(routingKey),​ по которому связывается обменник и очередь. В данном уроке он будет играть основную роль. Стоит также напомнить,​ что ключ напрямую зависит от типа обменника. Так для обменника с типом fanout, он просто игнорируется. К примеру,​ если нужно связать обменник и очередь по ключу ‘failure_messages’
  
-<​code>​+<​code ​php>
 $queue->​bind($exchange->​getName(),​ '​failure_messages'​);​ $queue->​bind($exchange->​getName(),​ '​failure_messages'​);​
 </​code>​ </​code>​
Строка 493: Строка 489:
 Для отправки сообщений способом точка-точка обменник должен быть создан с типом direct, который сооветствует константе AMQP_EX_TYPE_DIRECT. Для отправки сообщений способом точка-точка обменник должен быть создан с типом direct, который сооветствует константе AMQP_EX_TYPE_DIRECT.
  
-<​code>​+<​code ​php>
 $exchange = new AMQPExchange($channel);​ $exchange = new AMQPExchange($channel);​
 $exchange->​setName('​logs'​);​ $exchange->​setName('​logs'​);​
Строка 502: Строка 498:
 После чего возможна публикация сообщений по ключу После чего возможна публикация сообщений по ключу
  
-<​code>​+<​code ​php>
 $exchange->​publish($message,​ '​notice'​);​ $exchange->​publish($message,​ '​notice'​);​
 </​code>​ </​code>​
Строка 508: Строка 504:
 Получение сообщений Получение сообщений ничем не отличается от предыдущего урока, за исключением того, что нам нужно связать обменник с очередью по каждому типу Получение сообщений Получение сообщений ничем не отличается от предыдущего урока, за исключением того, что нам нужно связать обменник с очередью по каждому типу
  
-<​code>​+<​code ​php>
 $queue = new AMQPQueue($channel);​ $queue = new AMQPQueue($channel);​
 $queue->​declare();​ $queue->​declare();​
Строка 523: Строка 519:
 Продюсер (send.php) Продюсер (send.php)
  
-<​code>​+<​code ​php>
 $params = array( $params = array(
     '​host'​ => '​localhost',​     '​host'​ => '​localhost',​
Строка 630: Строка 626:
 Для отправки сообщений по шаблону обменник должен быть создан с типом topic, который сооветствует константе AMQP_EX_TYPE_TOPIC. Для отправки сообщений по шаблону обменник должен быть создан с типом topic, который сооветствует константе AMQP_EX_TYPE_TOPIC.
  
-<​code>​+<​code ​php>
 $exchange = new AMQPExchange($channel);​ $exchange = new AMQPExchange($channel);​
 $exchange->​setName('​logs'​);​ $exchange->​setName('​logs'​);​
Строка 639: Строка 635:
 После чего возможна публикация сообщений по ключу После чего возможна публикация сообщений по ключу
  
-<​code>​+<​code ​php>
 $exchange->​publish($message,​ '​kern.notice'​);​ $exchange->​publish($message,​ '​kern.notice'​);​
 </​code>​ </​code>​
Строка 647: Строка 643:
 Получение сообщений ничем не отличается от предыдущего урока Получение сообщений ничем не отличается от предыдущего урока
  
-<​code>​+<​code ​php>
 $queue = new AMQPQueue($channel);​ $queue = new AMQPQueue($channel);​
 $queue->​declare();​ $queue->​declare();​
Строка 662: Строка 658:
 Продюсер(send.php) Продюсер(send.php)
  
-<​code>​+<​code ​php>
 $params = array( $params = array(
     '​host'​ => '​localhost',​     '​host'​ => '​localhost',​
Строка 744: Строка 740:
 В целом, реализация RPC посредством RabbitMQ довольно проста. Клиент отправляет сообщение,​ а сервере отвечает. Для обработки ответа сервера,​ необходимо создать callback очередь. Чтобы узнать какая callback очередь ожидает ответа,​ мы должны в запросе послать ее имя. Для этого на продюсере создается анонимная очередь и ее имя добавляется в параметры запроса В целом, реализация RPC посредством RabbitMQ довольно проста. Клиент отправляет сообщение,​ а сервере отвечает. Для обработки ответа сервера,​ необходимо создать callback очередь. Чтобы узнать какая callback очередь ожидает ответа,​ мы должны в запросе послать ее имя. Для этого на продюсере создается анонимная очередь и ее имя добавляется в параметры запроса
  
-<​code>​+<​code ​php>
 $replyQueue = new AMQPQueue($channel);​ $replyQueue = new AMQPQueue($channel);​
 $replyQueue->​setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE | AMQP_EXCLUSIVE);​ $replyQueue->​setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE | AMQP_EXCLUSIVE);​
Строка 779: Строка 775:
 Функция обработки сообщения на стороне сервера выглядит следующим образом Функция обработки сообщения на стороне сервера выглядит следующим образом
  
-<​code>​+<​code ​php>
 function doWork($message) function doWork($message)
 { {
Строка 797: Строка 793:
 Функция обработки сообщения на стороне клиента Функция обработки сообщения на стороне клиента
  
-<​code>​+<​code ​php>
 function getWork($message) function getWork($message)
 { {
Строка 806: Строка 802:
 Продюсер(send.php) Продюсер(send.php)
  
-<​code>​+<​code ​php>
 $params = array( $params = array(
     '​host'​ => '​localhost',​     '​host'​ => '​localhost',​
php/rabbitmq/amqplib.1693489787.txt.gz · Последние изменения: 2023/08/31 16:49 — werwolf