Messenger: работа с синхронизированными сообщениями и сообщениями в очереди

Дата обновления перевода 2024-07-31

Messenger: работа с синхронизированными сообщениями и сообщениями в очереди

Messenger предоставляет автобус сообщений с возможностью отправки сообщений, а затем работы с ними сразу же в вашем приложении, или их отправки через транспорт (например, очередь) для их обработки позже. Чтобы узнать об этом больше, прочтите документацию компонента Messenger.

Установка

В приложениях, использующих Symfony Flex , выполните эту команду, чтобы установить Messenger:

1
$ composer require symfony/messenger

Создание сообщения и обработчика

Messenger строится на двух разных классах, которые вы создадите: (1) классе сообщения, который содержит данные, и (2) классе обработчика(ов), который будет вызван после запуска сообщения. Класс обработчика будет читать класс сообщения и выполнять некоторое действие.

Для класса сообщения нет особых требований, кроме того, что его можно сериализовать:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// src/Message/SmsNotification.php
namespace App\Message;

class SmsNotification
{
    public function __construct(
        private string $content,
    ) {
    }

    public function getContent(): string
    {
        return $this->content;
    }
}

Обработчик сообщений - это PHP-вызываемое, рекомендованный способ его создания - создать класс, реализующий MessageHandlerInterface и имеющий метод __invoke(), который имеет подсказки класса сообщения (или интерфейс сообщения):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class SmsNotificationHandler
{
    public function __invoke(SmsNotification $message)
    {
        // ... сделайте что-то - вроде отправки SMS!
    }
}

Tip

Вы можете также использовать атрибут #[AsMessageHandler] в отдельных методах класса. Вы можете использовать атрибут в стольки методах одного класа, скольки хотите, что позволяет вам группировать обработку нескольких связанных типов сообщений.

Благодаря автоконфигурации и подсказке SmsNotification, Symfony значет, что этот отбработчик должен быть вызван, когда запускается сообщение SmsNotification. В большинстве случаев, это все, что вам нужно будет сделать. Но вы можете также сконфигурировать обработчики сообщений вручную . Чтобы увидеть всех сконфигурированных обработчиков, выполните:

1
$ php bin/console debug:messenger

Развёртывание сообщения

Вы готовы! Для запуска сообщения (и вызова обработчика), внедрите сервис messenger.default_bus (через MessageBusInterface), например, в контроллер:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// src/Controller/DefaultController.php
namespace App\Controller;

use App\Message\SmsNotification;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;

class DefaultController extends AbstractController
{
    public function index(MessageBusInterface $bus): Response
    {
        // приведет к вызову SmsNotificationHandler
        $bus->dispatch(new SmsNotification('Look! I created a message!'));

        // ...
    }
}

Транспорт: асинхронные сообщения/сообщения в очереди

По умолчанию, сообщения обрабатываются сразу же после запуска. Если вы хотите обработать сообщение асинхронно, вы можете сконфигурировать транспорт. Транспорт может отправлять сообщения (например, в систему очереди) и затем получать из через работника . Messenger поддерживает несколько транспортов .

Note

Если вы хотите использовать транспорт, который не поддерживается, посмотрите на транспорт Enqueue, который поддерживает штуки вроде Kafka и Google Pub/Sub.

Транспорт регистрируется с использованием "DSN". Благодаря рецепту Flex для Messenger, ваш файл .env уже имеет несколько примеров.

1
2
3
# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
# MESSENGER_TRANSPORT_DSN=doctrine://default
# MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages

Раскомментируйте тот транспорт, который вы хотите (или установите его в .env.local). См. , чтобы узнать больше деталей.

Далее, в config/packages/messenger.yaml, давайте определим транспорт под название async, использующий эту конфигурацию:

1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async: "%env(MESSENGER_TRANSPORT_DSN)%"

            # или расширено для конфигурации большего количества опций
            #async:
            #    dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
            #    options: []

Маршрутизация сообщений к транспорту

Теперь, когда у вас есть сконфигурированный транспорт, вместо немедленно обработки сообщений, вы можете сконфигурировать их так, чтобы они были отправлены транспорту:

1
2
3
4
5
6
7
8
9
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async: "%env(MESSENGER_TRANSPORT_DSN)%"

        routing:
            # async - это то имя, которое вы дали вашему транспорту выше
            'App\Message\SmsNotification': async

Благодаря этому, App\Message\SmsNotification будет отправлен транспорту async, и его обработчик(и) не будут вызваны сразу же. Все сообщения, не совпавшие с routing будут обработаны немедленно.

Note

Вы можете использовать частичное пространство имен PHP, например 'App\Message\*', чтобы сопоставить все сообщения, находящиеся в соответствующем пространстве имен. Единственным требованием является то, что подстановочный знак '*' должен быть помещен в конец пространства имен.

Вы можете использовать '*' как класс сообщения. Он будет вести себя как правило маршрутизации по умолчанию для любого сообщения, не совпадающего с routing. Это полезно, чтобы гарантировать, что ни одно сообщение не обрабатывается синхронно по умолчанию.

Единственный недостаток в том, что '*' также будет применяться к электронным письмам, отправленным с Symfony Mailer (который использует SendEmailMessage, если доступен Messenger). Это может привести к проблемам, если ваши письма не поддаются сериализации (например, если они содержат вложения файлов как PHP источники/потоки).

Вы также можете маршрутизировать классы по их родительскому классу или интерфейсу. Или отправлять сообщения нескольким транспортам:

1
2
3
4
5
6
7
8
9
# config/packages/messenger.yaml
framework:
    messenger:
        routing:
            # маршрутизируйте все сообщения, расширяющие этот пример базового класса или интерфейс
            'App\Message\AbstractAsyncMessage': async
            'App\Message\AsyncMessageInterface': async

            'My\Message\ToBeSentToTwoSenders': [async, audit]

Note

Если вы сконфигурируете машрутизацию и для дочернего, и для родительского класса, используются оба правила. Например, если у вас есть объект SmsNotification, расширяющийся из Notification, будут использованы маршрутизации и для Notification, и для SmsNotification.

Tip

Вы можете определять и переопределять транспорт, который использует сообщение, во время прогона, используя TransportNamesStamp в конверте сообщения. Этот штамп берёт массив имён транспорта в качестве единственного аргумента. Чтобы узнать больше о штампах, см. Конверты & штампы.

Сущности Doctrine в сообщениях

Если вам нужно передать сущность в Doctrine в сообщении, лучше всего передать основной ключ сущности (или любую релевантную информацию, необходимую обработчику, вроде email, и др.) вместо объекта:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// src/Message/NewUserWelcomeEmail.php
namespace App\Message;

class NewUserWelcomeEmail
{
    public function __construct(
        private int $userId,
    ) {
    }

    public function getUserId(): int
    {
        return $this->userId;
    }
}

Затем, в вашем обработчике, вы можете сделать запрос свежего объекта:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// src/MessageHandler/NewUserWelcomeEmailHandler.php
namespace App\MessageHandler;

use App\Message\NewUserWelcomeEmail;
use App\Repository\UserRepository;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class NewUserWelcomeEmailHandler
{
    public function __construct(
        private UserRepository $userRepository,
    ) {
    }

    public function __invoke(NewUserWelcomeEmail $welcomeEmail): void
    {
        $user = $this->userRepository->find($welcomeEmail->getUserId());

        // ... отправить электронное письмо!
    }
}

Это гарантирует, что сущность содержит свежие данные.

Синхронная обработка сообщений

Если сообщение не совпадает ни с одним правилом маршрутизации , оно не будет отправлено ни одному транспорту, и будет обработано немедленно. В некоторых случаях (например, при связывании обработчиков с разными транспортами), легче и более гибко обработать их ясно: создав транспорт sync и "отправив" сообщения в него для немедленной обработки:

1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            # ... другие транспорты

            sync: 'sync://'

        routing:
            App\Message\SmsNotification: sync

Создание вашего собственного транспорта

Вы также можете создать собственный транспорт, если вам нужно отправлять или получать сообщения из чего-то, что не поддерживается. См. Как создать ваш собственный транспорт сообщений.

Потребление сообщений (запуск работника)

Когда ваши сообщения были маршрутизированы, в большинстве случаев, вам нужно будет "потребить" их. Вы можете сделать это с помощью команды messenger:consume:

1
2
3
4
$ php bin/console messenger:consume async

# используйте -vv, чтобы увидеть детали того, что происходит
$ php bin/console messenger:consume async -vv

Первый аргумент - это имя получателя (или id сервиса, если вы маршрутизировали к пользовательскому сервису). По умолчанию, команда будет выполняться бесконечно: искать новые сообщения в вашем транспорте и обрабатывать их. Эта команда называется вашим "работником".

Если вы хотите потреблять сообщения от всех доступных получателей, вы можете использовать команду с опцией --all:

1
$ php bin/console messenger:consume --all

7.1

Опция --all была представлена в Symfony 7.1.

Tip

В окружении разработчиков и при использовании инструмента Symfony CLI, вы можете сконфигурировать работников, которые будут автоматически запускаться вместе с веб-сервером. Более подробную информацию вы можете найти в документации Работники Symfony CLI .

Tip

Чтобы правильно остановить работника, вызовите экземпляр StopWorkerException.

Развёртывание в производство

В производстве, есть несколько важных вещей, о которых стоит подумать:

Используйте Супервизора, чтобы ваш(и) работник(и) работали
Вы хотите, чтобы один или более "работников" работали все время. Чтобы сделать это, используйте систему контроля процесса вроде Супервизора .
Не позволяйте работникам работать бесконечно
Некоторые сервисы (вроде EntityManager Doctrine) будут потреблять все больше памяти со временем. Поэтому, вместо того, чтобы позволять вашему работнику работать всегда, используйте флажок вроде messenger:consume --limit=10, чтобы указать работнику, что он должен обработать только 10 сообщений до прекращения работы (затем Супервизор создаст новый процесс). Также есть другие опции вроде --memory-limit=128M и --time-limit=3600.
Остановка работников, которые столкнулись с ошибками
Если зависимость работника, вроде вашего сервера БД не отвечает, или достигнут тайм-аут, вы можете попробовать добавить логику повторного соединения , или просто остановить работника, если он получает слишком много ошибок, с помощью опции --failure-limit команды messenger:consume.
Перезагружайте работников при развёртывании
Каждый раз при развёртывании вам нужно будет перезагрузить все процессы ваших работников, чтобы они видели новозапущенный код. Чтобы сделать это, выполните messenger:stop-workers при запуске. Это сигнализирует каждому работнику, что он должен закончить обработку текущего сообщения и грациозно завершить работу. Затем, Супервизор создаст новые процессы работников. Команда использует кеш app внутренне - поэтому убедитесь в том, что он сконфигурирован для использования адаптера по вашему вкусу.
Используйте один кеш между запусками
Если ваша стратегия развёртывания заключается в создании новых целевых каталогов, вам нужно установить значение опции конфигурации cache.prefix.seed , чтобы использовать одно и то же пространство имен кеша между запусками. Иначе, пул cache.app будет использовать значение параметра kernel.project_dir в качестве базы для пространства имен, что приведет к разным пространствам имен каждый раз при запуске.

Приоритезированный транспорт

Иногда определенные типы сообщений должны иметь более высокий приоритет и быть обработаны до других. Чтобы сделать это возможным, вы можете создать несколько транспортов и маршрутизировать разные сообщения к ним. Например:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    # queue_name соответствует транспорту doctrine
                    queue_name: high

                    # для AMQP отправьте отдельный обмен, а затем поставьте в очередь
                    #exchange:
                    #    name: high
                    #queues:
                    #    messages_high: ~
                    # or redis try "group"
            async_priority_low:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: low

        routing:
            'App\Message\SmsNotification':  async_priority_low
            'App\Message\NewUserWelcomeEmail':  async_priority_high

Затем вы можете запускать отдельных работников для каждого транспорта, или инструктировать одного работника, чтобы он обрабатывал сообщения в порядке приоритетности:

1
$ php bin/console messenger:consume async_priority_high async_priority_low

Работник будет всегда вначале искать сообщения, ожидающие async_priority_high. Если таких нет, затем он будет потреблять сообщения из async_priority_low.

Ограничьте потребление для конкретных очередей

Некоторый транспорт (особенно AMQP) имеет концепцию обмена и очередей. Транспорт Symfony всегда связан с обменом. По умолчанию, работник потребляет из всех очередей, соединенных с обменом указанного транспорта. Однако, есть примеры использования, когда вам нужно, чтобы работник потреблял только из конкретной очереди.

You can limit the worker to only process messages from specific queues:

1
2
3
4
$ php bin/console messenger:consume my_transport --queues=fasttrack

# вы можете передать опцию --queues больше одного раза, чтобы обработать несколько очередей
$ php bin/console messenger:consume my_transport --queues=fasttrack1 --queues=fasttrack2

Note

Чтобы позволить использование опции queues, получатель должен реализовывать QueueReceiverInterface.

Проверка количества сообщений в очереди для транспорта

Выполните команду messenger:stats, чтобы узнать, сколько сообщений находятся в "очередяз" некоторого или всех транспортов:

1
2
3
4
5
# отображает количество сообщений в очереди во всех транспортах
$ php bin/console messenger:stats

# отображает статистику только для некоторых транспортов
$ php bin/console messenger:stats my_transport_name other_transport_name

Note

Для того, чтобы эта команда работала, сконфигурированный получатель транспорта должен реализовывать
MessageCountAwareInterface.

Конфигурация супервизора

Супервизор - это отличный инструмент для гарантии того, что процесс(ы) ваших работников всегда производятся (даже если он закрывается в связи с ошибкой, достижением лимита сообщений или благодаря messenger:stop-workers). Вы можете установить его на Ubuntu, к примеру, через:

1
$ sudo apt-get install supervisor

Файлы конфигурации Супервизора обычно живут в каталоге /etc/supervisor/conf.d. Например, вы можете создать там новый файл messenger-worker.conf, чтобы убедиться, что 2 экземпляра messenger:consume работают всегда:

1
2
3
4
5
6
7
8
9
10
;/etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
user=ubuntu
numprocs=2
startsecs=0
autostart=true
autorestart=true
startretries=10
process_name=%(program_name)s_%(process_num)02d

Измените аргумент async, чтобы он использовал название вашего транспорта (или транспортов) и user на Unix-пользователя на вашем сервере.

Caution

Во время развёртывания, что-то может быть не доступно (например, БД), что приведет к ошибке запуска потребителя. В такой ситуации, Supervisor попробует startretries несколько раз, чтобы перезапустить команду. Убедитесь в том, что изменили эту настройку, чтобы избежать получения команды в ФАТАЛЬНОМ состоянии, после которого её невозможно будет перезапустить.

Каждый перезапуск, Supervisor увеличивает задержку на 1 секунду. Например, если значение - 10, он подождёт 1 сек, 2 сек, 3 сек и т.д. Это даёт сервису в общей сложности 55 секунд, чтобы снова стать доступным. Увеличьте настройку startretries, чтобы покрыть максимум ожидаемого времени простоя.

Если вы используете транспорт Redis, заметьте, что каждому работнику нужно уникальное имя потребителя, чтобы избежать обработки одного сообщения несколькими работникам. Один из способов достичь этого - установить переменную окружения в файле конфигурации Супервизора, на которую вы потом можете сослаться в messenger.yaml (см. раздел Redis выше):

1
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d

Далее, укажите Supervisor прочитать вашу конфигурацию и запустить ваших работников:

1
2
3
4
5
6
7
8
9
$ sudo supervisorctl reread

$ sudo supervisorctl update

$ sudo supervisorctl start messenger-consume:*

# Если вы развертываете обновление вашего кода, не забудьте перезапустить ваших работников,
# чтобы они выполнили новый код
$ sudo supervisorctl restart messenger-consume:*

См. документацию Supervisor, чтобы узнать больше деталей.

Плавное завершение работы

Если вы установили в своем проекте PHP-расширение PCNTL, работники будут обрабатывать POSIX сигнал SIGTERM для окончания обработки текущего сообщения перед выходом.

Однако вы можете предпочесть использовать другие POSIX-сигналы для плавного завершения работы. Вы можете переопределить сигналы по умолчанию, установив опцию конфигурации framework.messenger.stop_worker_on_signals.

В некоторых случаях, сигнал SIGTERM отправляется самим Супервизором (например, остановка контейнера Docker в котором Супервизор - точка входа). В таких случаях, вам нужно добавить ключ stopwaitsecs к конфигурации программы (со значением желаемого периода острочки в секундах) для того, чтобы выполнить грациозное выключение:

1
2
[program:x]
stopwaitsecs=20

Конфигурация Systemd

Хотя Supervisor - это отличный инструмент, он имеет недостаток в том, что вам нужен доступ к системе, чтобы его запустить. Systemd стала стандартом в большинстве дистрибуций Linux, и имеет хорошую альтернативу под названием сервисы пользователя.

Файлы конфигурации сервисов пользователя Systemd обычно живут в каталоге ~/.config/systemd/user. Например, вы можете создать новый файл messenger-worker.service. Или файл messenger-worker@.service, если вы хотите, чтобы больше экземпляров работали одновременно:

1
2
3
4
5
6
7
8
9
10
[Unit]
Description=Symfony messenger-consume %i

[Service]
ExecStart=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
Restart=always
RestartSec=30

[Install]
WantedBy=default.target

Теперь, укажите systemd включить и запустить одного работника:

1
2
3
4
5
6
$ systemctl --user enable messenger-worker@1.service
$ systemctl --user start messenger-worker@1.service

# чтобы включить и запустить 20 работников
$ systemctl --user enable messenger-worker@{1..20}.service
$ systemctl --user start messenger-worker@{1..20}.service

Если вы измените ваш файл конфигурации сервиса, вам нужно перезагрузить daemon:

1
$ systemctl --user daemon-reload

Чтобы перезапустить всех ваших потребителей:

1
$ systemctl --user restart messenger-consume@*.service

Экземпляр пользователя systemd запускается только после первого входа в систему конкретным пользователем. Потребителю часто нужно запускаться при начальной загрузке системы. Включите lingering в пользователе, чтобы активировать это поведение:

1
$ loginctl enable-linger <your-username>

Логами управляет journald и с ними можно работать, используя команду journalctl:

1
2
3
4
5
6
7
8
# следить за логами потребителя номер 11
$ journalctl -f --user-unit messenger-consume@11.service

# следить за логами всех потребителей
$ journalctl -f --user-unit messenger-consume@*

# следить за всеми логами из ваших сервисов пользователя
$ journalctl -f _UID=$UID

См. документацию systemd, чтобы узнать больше.

Note

Вам нужно либо иметь повышенные привелегии для команды journalctl, либо добавить вашего пользователя в группу systemd-journal:

1
$ sudo usermod -a -G systemd-journal <your-username>

Работник без состояния

PHP создан быть без состояния, разные запросы не имеют общих источников. В HTTP контексте PHP очищает все перед отправкой ответа, поэтому вы можете решить не заботиться о сервисах, которые могут допускать утечку памяти.

С другой стороны, работники обычно работают в долгосрочных CLI-процессах, которые не заканчиваются после обработки сообщения. Поэтому вам нужно быть осторожными с состояниями сервисов, чтобы они не давали утечку информации и/или памяти из одного сообщения в другое.

Однако, некоторые сервисы Symfony, вроде обработчика fingers crossed Monolog, допускают утечку по своей задумке. Symfony предоставляет функцию перезапуска сервиса, чтобы решить эту проблему. При перезапуске контейнера автоматически между двумя сообщениями, Symfony ищет любые сервисы, реализующие ResetInterface (включая ваши собственные сервисы) и вызывает их метод reset(), чтобы они могли очистить своё внутренее состояние.

Если сервис не без состояния, и вы хотите перезапускать его свойства после каждого сообщения, то сервис должен реализовывать ResetInterface, где вы можете перезапустить свойства в методе reset().

Если вы не хотите перезапускать контейнер, добавьте опцию --no-reset при выполнении команды messenger:consume.

Транспорт, ограниченный в скорости

Бывает, что вам нужно ограничить вашего работника сообщений. Вы можете сконфигурировать ограничитель скорости а транспорте (для этого требуется компонент RateLimiter), установив его опцию rate_limiter:

1
2
3
4
5
6
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async:
                rate_limiter: your_rate_limiter_name

Caution

При конфигурации ограничителя скорости в транспорте он будет блокировать всего работника при превышении лимита. Вы должны убедиться, что сконфгурировали выделенного работника для транспорта с ограничением скорости, чтобы избежать блокировки других транспортов.

Повторные попытки и ошибки

Если во время потребления сообщения из транспорта будет вызвано исключение, оно будет автоматически повторно отправлено транспорту, чтобы попробовать снова. По умолчанию, сообщение имеет 3 попытки перед сбросом или отправкой транспорту ошибок . Каждая повторная попытка будет тажке отложена во времени, на случай, если она была вызвана временной проблемой. Все это можно сконфигурировать для каждого транспорта:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'

                # конфигурация по умолчанию
                retry_strategy:
                    max_retries: 3
                    # задержка в милисекундах
                    delay: 1000
                    # делает так, чтобы задержка была дольше перед каждой повторной попыткой
                    # например, задержка в 1 секунду, 2 секунду, 4 секунду
                    multiplier: 2
                    max_delay: 0
                    # переопределить это все сервисом, который
                    # реализует Symfony\Component\Messenger\Retry\RetryStrategyInterface
                    # service: null

7.1

Опция jitter была представлена в Symfony 7.1.

Tip

Symfony запускает WorkerMessageRetriedEvent, когда сообщение имеет повторные попытки, поэтому вы можете выполнить собственную логику.

Note

Благодаря SerializedMessageStamp, сериализованная форма сообщения сохраняется, что предупреждает его повторную сериализацию, если позже будет ещё одна попытка сообщения.

Избегание повторных попыток

Иногда обработка сообщения может быть неудачной, и вы будете знать, что это перманентно и повторных попыток не нужно. Если вы вызовете UnrecoverableMessageHandlingException, сообщение не будет иметь повторных попыток.

Note

Сообщения, которые не будут повторно обработаны, все равно появятся в сконфигурированном транспорте отказов. Если вы хотите избежать этого, подумайте о том, чтобы обработать ошибку самостоятельно и позволить обработчику успешно завершиться.

Форсирование повторных попыток

Иногда обработка сообщения может быть неудачной, и вы будете знать, что это временно и необходима повторная попытка. Если вы вызовете RecoverableMessageHandlingException, сообщение всегда будет иметь повторную попытку.

Сохранения и повторные попытки неудачных сообщений

Если сообщение терпит неудачу и имеет несколько повторных попыток (max_retries), оно затем будет отбраковано. Чтобы избежать этого, вы можете сконфигурировать failure_transport:

1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        # после повторных попыток, сообщения будут отправлены транспорту "failed"
        failure_transport: failed

        transports:
            # ... другой транспорт

            failed: 'doctrine://default?queue_name=failed'

В этом примере, если обработка сообщения терпит неудачу 3 раза (значение по умолчанию max_retries), оно затем будет отправлено транспорту failed. Хотя вы можете использовать messenger:consume failed для потребления его, как обычного транспорта, вам скорее захочется вручную просмотреть сообщения в транспорте ошибок и решить об их повторных попытках:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# увидеть все сообщения в транспорте ошибок
$ php bin/console messenger:failed:show

# увидеть первых 10 сообщений
$ php bin/console messenger:failed:show --max=10

# увидеть только сообщения MyClass
$ php bin/console messenger:failed:show --class-filter='MyClass'

# увидеть количество сообщений по классу сообщения
$ php bin/console messenger:failed:show --stats

# увидеть детали конкретной ошибки
$ php bin/console messenger:failed:show 20 -vv

# увидеть и повторно попробовать каждое сообщение по-отдельности
$ php bin/console messenger:failed:retry -vv

# повторная попытка конкретных сообщений
$ php bin/console messenger:failed:retry 20 30 --force

# удалить сообщение без повторной попытки
$ php bin/console messenger:failed:remove 20

# удалить сообщения без повторных попыток и показать каждое сообщение перед удалением
$ php bin/console messenger:failed:remove 20 30 --show-messages

# удалить все сообщения в транспорте неудач
$ php bin/console messenger:failed:remove --all

Если сообщение опять потерпит неудачу, оно будет отправлено обратно в транспорт ошибок в соответствии с обычными правилами повторных попыток . Как только будет достигнут максимум повторных попыток, сообщение будет сброшено перманентно.

Несколько ошибочных транспортов

Иногда недостаточно иметь один глобальный сконфигурированный failed transport, котому что некоторые сообщения важнее, чем другие. В таких случаях, вы можете переопределить транспорт ошибок только для определенных транспортов:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# config/packages/messenger.yaml
framework:
    messenger:
        # после повторных попыток, сообщения будут отправлены транспорту "failed"
        # по умолчанию, если внутри транспорта не сконфигурирован "failed_transport"
        failure_transport: failed_default

        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                failure_transport: failed_high_priority

            # так как транспорт ошибок не сконфигурирован, будет использоваться установленный
            # глобальный набор "failure_transport"
            async_priority_low:
                dsn: 'doctrine://default?queue_name=async_priority_low'

            failed_default: 'doctrine://default?queue_name=failed_default'
            failed_high_priority: 'doctrine://default?queue_name=failed_high_priority'

Если нету определенного failure_transport глобально или на уровне транспорта, сообщение будет сброшено после определенного количества попыток.

Неудачные команды имеют необязательную опцию --transport, чтобы указать failure_transport, сконфигурированный на уровне транспорта.

1
2
3
4
5
6
7
8
# увидеть все сообщения в транспорте "failure_transport"
$ php bin/console messenger:failed:show --transport=failure_transport

# повторная попытка конкретных сообщений из "failure_transport"
$ php bin/console messenger:failed:retry 20 30 --transport=failure_transport --force

# удалить сообщение без повторной попытки из "failure_transport"
$ php bin/console messenger:failed:remove 20 --transport=failure_transport

Конфигурация транспорта

Messenger поддерживает несколько разных типов транспорта, каждый со своими опциями. Опции могут быть переданы транспорту через строку DSN или конфигурацию.

1
2
# .env
MESSENGER_TRANSPORT_DSN=amqp://localhost/%2f/messages?auto_setup=false
1
2
3
4
5
6
7
8
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            my_transport:
                dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
                options:
                    auto_setup: false

Опции, определенные под options, главенствуют над определенными в DSN.

Транспорт AMQP

Транспорт AMQP использует PHP-расширение AMQP для отправки сообщений в очередь вроде RabbitMQ. Установите его, выполнив:

1
$ composer require symfony/amqp-messenger

DSN транспорта AMQP может выглядеть так:

1
2
3
4
5
# .env
MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages

# или используйте протокол AMQPS
MESSENGER_TRANSPORT_DSN=amqps://guest:guest@localhost/%2f/messages

Если вы хотите использовать AMQP, зашифрованный с помощью TLS/SSL, вы должны также предоставить CA-сертификат. Определите путь сертификата в настройке PHP.ini amqp.cacert (например, amqp.cacert = /etc/ssl/certs) или в параметре DSN cacert (например, amqps://localhost?cacert=/etc/ssl/certs/).

Порт, по умолчанию используемый AMQP, зашифрованным с помощью TLS/SSL, - 5671, но вы можете переопределить его в параметре DSN port (например, amqps://localhost?cacert=/etc/ssl/certs/&port=12345).

Note

По умолчанию, транспорт будет автоматически создавать любые необходимые обмены, очереди и связующие ключи. Это можно отключить, но некоторые функции могут работать некорректно (вроде отложенных очередей). Чтобы не создавать никаких очередей автоматически, вы можете сконфигурировать транспорт с queues: [].

Note

Вы можете ограничить потребителей транспорта AMQP, чтобы они обрабатывали только сообщения из некоторых очередей какого-то обмена. См. .

Транспорт имеет ряд других опций, включая способы конфигурации обмена, очереди, связывающие ключи, и многое другое. См. документацию по Connection.

Транспорт имеет ряд опций:

Вы можете также сконфигурирвать настройки специально для AMQP в вашем оообщении, добавив AmqpStamp к вашему Конверту:

1
2
3
4
5
6
7
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
// ...

$attributes = [];
$bus->dispatch(new SmsNotification(), [
    new AmqpStamp('custom-routing-key', AMQP_NOPARAM, $attributes),
]);

Caution

Потребители не отображаются в панели админа, так как этот транспорт не полагается на \AmqpQueue::consume(), который блокирует. Наличие блокирующего получателя делает опции --time-limit/--memory-limit команды messenger:consume. а также команды messenger:stop-workers бесполезными, так как они полагаются на тот факт, что получатель возвращается незамедлительно, независимо от того, находит он сообщение или нет. Работник потребления отвечает за итерацию до получения сообщения для обработки и/или до того, как будет достигнуто одно из условий остановки. Таким образом, логика остановки работника может быть достигнута, если он застрял на блокирующем вызове.

Tip

Если ваше приложение сталкивается с исключениями сокетов или высоким оборотом соединений (что проявляется в быстром создании и удалении соединений), рассмотрите возможность использования AMQProxy. Этот инструмент работает как шлюз между Symfony Messenger и AMQP-сервером, поддерживая стабильные соединения и минимизируя издержки (что также улучшает общую производительность).

Транспорт Doctrine

Транспорт Doctrine может быть использован для хранения сообщений в таблице базы данных. Установите его, запустив:

1
$ composer require symfony/doctrine-messenger

DSN транспорта Doctrine может выглядеть так:

1
2
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default

Формат doctrine://<connection_name>, в случае если у вас есть несколько соединений и вы хотите использовать какое-либо другое, чем "default". Транспорт будет автоматически создавать таблицу под названием messenger_messages.

Если вы хотите изменить имя таблицы по умолчанию, передайте пользовательское имя таблицы в DSN с помощью опции table_name:

1
2
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default?table_name=your_custom_table_name

Или, для создания таблицы самостоятельно, установите опцию auto_setup как false, и сгенерируйте миграцию .

Caution

Свойство datetime сообщений, хранящееся в базе данных, использует временную зону текущей системы. Это может вызвать проблемы, если несколько машин с разными конфигурациями временных зон используют одно хранилище.

Транспорт имеет такие опции:

????? ???????? ?? ?????????
table_name ???????? ??????? messenger_messages
queue_name ???????? ??????? (??????? ? ???????, ????? ???????????? ???? ??????? ??? ?????????? ???????????) default
redeliver_timeout ????-??? ????? ????????? ???????? ????????? ? ???????, ?? ?? ? ????????? "?????????" (???? ???????? ?? ?????-?? ??????? ???????????, ?????????? ???, ? ????? ?? ?????? ????? ??????????? ?????????) - ? ????????. 3600
auto_setup ?????? ?? ??????? ???? ??????? ????????????? ?? ????? ????????/?????????. true

Note

Установите redeliver_timeout в большее значение, чем длительность вашего самого медленного сообщения. Иначе, некоторые сообщения будут запускаться вторрой раз, пока первый всё ещё обрабатывается.

При использовании PostgreSQL, у вас есть доступ к следующим опциям для получения преимуществ функции LISTEN/NOTIFY. Это позволяет более производительный подход, чем поведение голосования транспорта Doctrine по умолчанию, так как PostgreSQL будет напрямую уведомлять работников, когда новое сообщение будет появляться в таблице.

????? ???????? ?? ?????????
use_notify ???????????? ?? LISTEN/NOTIFY. true
check_delayed_interval ???????? ???????? ?????????? ?????????, ? ????????????. ?????????? ??? 0, ????? ????????? ????????. 1000
get_notify_timeout ????????????????? ??????? ???????? ?????? ??? ?????? PDO::pgsqlGetNotify`, ? ????????????. 0

Транспорт Beanstalkd

Транспорт Beanstalkd отправляет сообщения прямо с рабочую очередь Beanstalkd. Установите его, выполнив:

1
$ composer require symfony/beanstalkd-messenger

DSN транспорта Beanstalkd может выглядеть так:

1
2
3
4
5
# .env
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost:11300?tube_name=foo&timeout=4&ttr=120

# Если порта нет, он по умолчанию будет 11300
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost

Транспорт имеет ряд опций:

????? ???????? ?? ?????????
tube_name ???????? ??????? default
timeout ????-??? ??????? ????????? - ? ????????. 0 (???????? ?????? ????? ?? ???? ??????? ?????, ???? ??????? TransportException)
ttr ????? ?????????? ????????? ????? ??? ??? ????????? ??? ??????? ? ??????? ?????????? - ? ????????. 90

Транспорт Redis

Транспорт Redis использует потоки для создания очереди сообщений. Этот транспорт требует PHP-расширения Redis (>=4.3) и работающего сервера Redis (^5.0). Установите его, выполнив:

$ composer require symfony/redis-messenger

DSN транспорта Redis может выглядеть так:

1
2
3
4
5
6
7
8
9
10
11
12
# .env
MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
# Полный пример DSN
MESSENGER_TRANSPORT_DSN=redis://password@localhost:6379/messages/symfony/consumer?auto_setup=true&serializer=1&stream_max_entries=0&dbindex=0&delete_after_ack=true
# Пример кластера Redis
MESSENGER_TRANSPORT_DSN=redis://host-01:6379,redis://host-02:6379,redis://host-03:6379,redis://host-04:6379
# Пример Unix-сокета
MESSENGER_TRANSPORT_DSN=redis:///var/run/redis.sock
# Пример TLS
MESSENGER_TRANSPORT_DSN=rediss://localhost:6379/messages
# Пример множества хостов Redis Sentinel
MESSENGER_TRANSPORT_DSN=redis:?host[redis1:26379]&host[redis2:26379]&host[redis3:26379]&sentinel_master=db

Некоторые опции могут быть сконфигурированы через DSN или ключ options под транспортом в messenger.yaml:

7.1

Опция `redis_sentinel`, которая является псевдонимом для `sentinel_master`, была представлна в Symfony 7.1.

Caution

Никогда не должно быть более одной команды messenger:consume выполняемой с одинаковой комбинацией stream, group и consumer, иначе сообщения могут быть обработаны более, чем один раз. Если вы запускаете несколько работников очерели, consumer может быть установлен как переменная окружения (вроде %env(MESSENGER_CONSUMER_NAME)%), установленная Супервизором (пример ниже) или любым другим сервисом, используемым для управления процессами работников. В окружении контейнера, HOSTNAME может быть использовано как имя потребителя, так как там только один работник на контейнер/хост. Если вы используете Kubernetes для управления контейнерами, рассмотрите использование StatefulSet для стабилизации имен.

Tip

Установите delete_after_ack как true (если у вас одна группа) или определите stream_max_entries (если вы можете предположить, какое максимальное количество записей допустимо в вашем случае), чтобы избежать утечек памяти. В другом случае. все сообщения навсегда останутся в Redis.

Транспорт в памяти

Транспорт in-memory на самом деле не доставляет сообщения. Вместо этого, он дедржит их в памяти во время запроса, что может быть полезным для тестирования. Например, если у вас есть транспорт async_priority_normal, вы можете переопределить его в окружении test, чтобы использовать этот транспорт:

1
2
3
4
5
# config/packages/test/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_normal: 'in-memory://'

Тогда, во время тестирования, сообщения не будут отправлены реальному транспорту. Даже лучше, в тесте, вы можете проверить, чтобы только одно сообщение было отправлено во время запроса:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// tests/Controller/DefaultControllerTest.php
namespace App\Tests\Controller;

use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport;

class DefaultControllerTest extends WebTestCase
{
    public function testSomething(): void
    {
        $client = static::createClient();
        // ...

        $this->assertSame(200, $client->getResponse()->getStatusCode());

        /* @var InMemoryTransport $transport */
        $transport = $this->getContainer()->get('messenger.transport.async_priority_normal');
        $this->assertCount(1, $transport->getSent());
    }
}

Транспорт имеет ряд опций:

serialize (булево, по умолчанию: false)
Сериализовать сообщения или нет. Это полезно для тестирования дополнительного слоя, особенно когда вы используете собственный сериализатор сообщений.

Note

Все транспорты in-memory будут автоматически сброшены после каждого теста в классах тестов, расширяющих KernelTestCase или WebTestCase.

Amazon SQS

Транспорт Amazon SQS прекрасно подходит для приложения на AWS. Установите его, выполнив:

1
$ composer require symfony/amazon-sqs-messenger

DSN SQS транспорта выглядит так:

1
2
3
# .env
MESSENGER_TRANSPORT_DSN=https://sqs.eu-west-3.amazonaws.com/123456789012/messages?access_key=AKIAIOSFODNN7EXAMPLE&secret_key=j17M97ffSVoKI0briFoo9a
MESSENGER_TRANSPORT_DSN=sqs://localhost:9494/messages?sslmode=disable

Note

Транспорт автоматически создаст необходимые очерди. Это можно отключить, установив опцию auto_setup как false.

Tip

До отправки или получения сообщения, Symfony необходимо конвертировать название очереди в URL очереди AWS вызвав API GetQueueUrl в AWS. Этого дополнительного API-вызова можно избежать, предоставив DSN, которая является URL очереди.

Транспорт имеет ряд опций:

????? ???????? ?? ?????????
access_key ???? ??????? AWS  
account ????????????? AWS-???????? ???????? ??????? ??????
auto_setup ????? ?? ????????????? ???????? ??????? ?? ????? ????????/?????????. true
buffer_size ?????????? ????????? ??? ???????????????? ?????????? 9
debug ???? true. ????? ??? ???? HTTP ???????? ? ??????? (??? ?????? ?? ??????????????????) false
endpoint ?????????? URL ? SQS-??????? https://sqs.eu-west-1.amazonaws.com
poll_timeout ????? ???????? ?????? ????????? ? ???????? 0.1
queue_name ???????? ??????? messages
region ???????? AWS-??????? eu-west-1
secret_key ????????? ???? AWS  
session_token ????? ?????? AWS  
visibility_timeout ?????????? ??????, ?? ?????????? ??????? ????????? ?? ????? ??????? (Visibility Timeout) ???????????? ???????
wait_time ???????????? Long polling ? ???????? 20

Note

Параметр wait_time определяет максимальное время ожидания для Amazon SQS до того, как сообщение будет доступно в очереди, перед отправкой ответа. Это помогает снизить стоимость использования Amazon SQS, устранив некоторое количество пустых ответов.

Параметр poll_timeout определяет время ожидания получателя до возвращения null. Он избегает блокировки других получателей от вызова.

Note

Если название очереди имеет суффикс .fifo, AWS создаст очередь FIFO. Используйте марку AmazonSqsFifoStamp, чтобы определить Message group ID и Message deduplication ID.

Другая возможность - включить AddFifoStampMiddleware. Если ваше сообщение реализует MessageDeduplicationAwareInterface, промежуточное ПО автоматически добавит AmazonSqsFifoStamp и установитMessage deduplication ID. Кроме того, если ваше сообщение реализует MessageGroupAwareInterface, промежуточное ПО автоматически установит Message group ID штампа.

Подробнее о промежуточном ПО вы можете узнать в в специальном разделе .

Очереди FIFO не поддерживают установки задержки отдельных сообдений, значение delay: 0 требуется в настройках стратегии повторных попыток.

Сериализация сообщений

Когда сообщения отправляются (и получаются) в транспорт, они сериализуются с использование нативных функций PHP serialize() и unserialize(). Вы можете изменить это глобально (или для каждого транспорта) на сервис, реализующий SerializerInterface:

1
2
3
4
5
6
7
8
9
10
11
12
13
# config/packages/messenger.yaml
framework:
    messenger:
        serializer:
            default_serializer: messenger.transport.symfony_serializer
            symfony_serializer:
                format: json
                context: { }

        transports:
            async_priority_normal:
                dsn: # ...
                serializer: messenger.transport.symfony_serializer

messenger.transport.symfony_serializer - это встроенный сервис, который использует компонент Serializer и может быть сконфигурирован несколькими способами. Если вы выберете использовать сериализатор Symfony, вы сможете контролировать контекст для каждого случая отдельно через SerializerStamp (см. Конверты и марки).

Tip

При отправке/получении сообщений в/из другого транспорта, вам может понадобиться больше контроля над процессом сериализации. Использование пользовательского сериализатора предоставляет такой контроль. См. Туториал по сериализации сообщений SymfonyCasts, чтобы узнать больше.

Выполнения команд и внешних процессов

Вызов команды

Запустить любую команду можно, развернув RunCommandMessage. Symfony позаботится об обработке этого сообщения и выполнит команду, переданную в параметр сообщения:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use Symfony\Component\Console\Messenger\RunCommandMessage;
use Symfony\Component\Messenger\MessageBusInterface;

class CleanUpService
{
    public function __construct(private readonly MessageBusInterface $bus)
    {
    }

    public function cleanUp(): void
    {
        // Долгое задание с некоторым кешированием...

        // После окончания, разверните какие-то команды очистки
        $this->bus->dispatch(new RunCommandMessage('app:my-cache:clean-up --dir=var/temp'));
        $this->bus->dispatch(new RunCommandMessage('cache:clear'));
    }
}

Вы можете сконфигурировать поведения в случае, если во время выполнения команды что-то пойдет не так. Для этого вы можете использовать параметры throwOnFailure и catchExceptions при создании экземпляра RunCommandMessage.

После обработки обработчик вернет RunCommandContext который содержит много полезной информации, такой как код выхода или вывод процесса. Вы можете обратиться к странице, посвященной результатам обработчика для получения дополнительной информации.

Вызов внещнего процесса

Messenger поставляется с удобным помощником для запуска внешних процессов путем развертывания сообщения. Для этого используется
компонент Process. Развертывая RunProcessMessage, Messenger позаботится о создании нового процесса с переданными вами параметрами:

use SymfonyComponentMessengerMessageBusInterface; use SymfonyComponentProcessMessengerRunProcessMessage;

class CleanUpService { public function __construct(private readonly MessageBusInterface $bus) { }

public function cleanUp(): void { $this->bus->dispatch(new RunProcessMessage(['rm', '-rf', 'var/log/temp/*'], cwd: '/my/custom/working-dir'));

// ...

}

}

После обработки обработчик вернет RunProcessContext, который содержит много полезной информации, такой как код выхода или вывод процесса. Вы можете обратиться к странице, посвященной результатам обработчика для получения дополнительной информации.

Обращение к веб-сервису

Бывает, что вам нужно регулярно пинговать веб-сервис, чтобы узнать его состояние, например, работает он или нет. Это можно сделать, развернув PingWebhookMessage:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use Symfony\Component\HttpClient\Messenger\PingWebhookMessage;
use Symfony\Component\Messenger\MessageBusInterface;

class LivenessService
{
    public function __construct(private readonly MessageBusInterface $bus)
    {
    }

    public function ping(): void
    {
        // HttpExceptionInterface вызывается в 3xx/4xx/5xx
        $this->bus->dispatch(new PingWebhookMessage('GET', 'https://example.com/status'));

        // Пинг, но не вызывается в 3xx/4xx/5xx
        $this->bus->dispatch(new PingWebhookMessage('GET', 'https://example.com/status', throw: false));

        // Может быть использована любая валидная опция HttpClientInterface
        $this->bus->dispatch(new PingWebhookMessage('POST', 'https://example.com/status', [
            'headers' => [
                'Authorization' => 'Bearer ...'
            ],
            'json' => [
                'data' => 'some-data',
            ],
        ]));
    }
}

Обработчик вернет ResponseInterface, позволяя вам собирать и обрабатывать информацию, возвращаемую HTTP-запросом.

Получение результатов из ваших обработчиков

Когда сообщение обработано, HandleMessageMiddleware добавляет HandledStamp для каждого объекта, который обработал сообщение. Вы можете использовать его для получения значения, возвращенного обработчиком(ами):

1
2
3
4
5
6
7
8
9
10
11
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;

$envelope = $messageBus->dispatch(new SomeMessage());

// получить значение, которое было возвращено последним обработчиком сообщений
$handledStamp = $envelope->last(HandledStamp::class);
$handledStamp->getResult();

// или получить информацию о всех обработчиках
$handledStamps = $envelope->all(HandledStamp::class);

Получение результатов при работе с командой и автобусами запросов

Компонент Messenger может использоваться в архитектурах CQRS, где автобусы команд и запросов являются центральными частями приложения. Прочитайте статью Мартина Фаулера о CQRS, чтобы узнать больше и то, как сконфигурировать несколько автобусов .

Поскольку запросы обычно синхронны и должны обрабатываться один раз, получение результата от обработчика является распространенной потребностью.

HandleTrait существует получения результата обработчика при синхронной обработке. Он также гарантирует, что будет зарегистрирован ровно один обработчик. HandleTrait может быть использован в любом классе, у которого есть свойство $messageBus:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// src/Action/ListItems.php
namespace App\Action;

use App\Message\ListItemsQuery;
use App\MessageHandler\ListItemsQueryResult;
use Symfony\Component\Messenger\HandleTrait;
use Symfony\Component\Messenger\MessageBusInterface;

class ListItems
{
    use HandleTrait;

    public function __construct(
        private MessageBusInterface $messageBus,
    ) {
    }

    public function __invoke(): void
    {
        $result = $this->query(new ListItemsQuery(/* ... */));

        // Сделать что-то с результатом
        // ...
    }

    // Создание такого метода необязательно, но позволяет добавлять к результату подсказки типа
    private function query(ListItemsQuery $query): ListItemsQueryResult
    {
        return $this->handle($query);
    }
}

Следовательно, вы можете использовать эту черту для создания классов команд и автобусов запросов. Например, вы можете создать специальный класс QueryBus и внедрить его везде, где вам нужно поведение автобуса запросов, вместо MessageBusInterface:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// src/MessageBus/QueryBus.php
namespace App\MessageBus;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\HandleTrait;
use Symfony\Component\Messenger\MessageBusInterface;

class QueryBus
{
    use HandleTrait;

    public function __construct(
        private MessageBusInterface $messageBus,
    ) {
    }

    /**
     * @param object|Envelope $query
     *
     * @return mixed The handler returned value
     */
    public function query($query): mixed
    {
        return $this->handle($query);
    }
}

Настройка обработчиков

Конфигурация обработчиков с использованием атрибутов

Вы можете сконфигурировать вашего обработчика, передав опции атрибуту:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler(fromTransport: 'async', priority: 10)]
class SmsNotificationHandler
{
    public function __invoke(SmsNotification $message): void
    {
        // ...
    }
}

Возможные опции для конфигурации с атрибутом:

????? ????????
bus ??? ????????, ? ???????? ?????????? ????? ???????? ?????????, ?? ????????? - ??? ????????.
fromTransport ??? ??????????, ? ???????? ?????????? ????? ???????? ?????????, ?? ????????? - ??? ??????????.
handles ??? ????????? (FQCN), ??????? ????? ???? ?????????? ????????????, ????? ?????? ? ??? ??????, ???? ?? ????? ???? ?????? ?? ????????? ????.
method ??? ??????, ??????? ????? ???????????? ?????????, ?????? ???? ????? ???????? ?????.
priority ????????? ???????????, ????? ????????? ???????????? ????? ???????????? ???? ? ?? ?? ?????????.

Конфигурация обработчиков вручную

Symfony обычно будет находить и регистрировать вашего обработчика автоматически . Но вы также можете сконфигурировать его вручную - и передать ему дополнительную конфигурацию - тегировав сервис обработчика messenger.message_handler

1
2
3
4
5
6
7
8
9
10
11
# config/services.yaml
services:
    App\MessageHandler\SmsNotificationHandler:
        tags: [messenger.message_handler]

        # или сконфигурировать с опциями
        tags:
            -
                name: messenger.message_handler
                # необходимо только если невозможно угадать по подсказке
                handles: App\Message\SmsNotification

Возможные опции конфигурации с тегами:

????? ????????
bus ??? ????????, ? ???????? ?????????? ????? ???????? ?????????, ?? ????????? - ??? ????????.
from_transport ??? ??????????, ? ???????? ?????????? ????? ???????? ?????????, ?? ????????? - ??? ??????????.
handles ??? ????????? (FQCN), ??????? ????? ???? ?????????? ????????????, ????? ?????? ? ??? ??????, ???? ?? ????? ???? ?????? ?? ????????? ????.
method ??? ??????, ??????? ????? ???????????? ?????????, ?????? ???? ????? ???????? ?????.
priority ????????? ???????????, ????? ????????? ???????????? ????? ???????????? ???? ? ?? ?? ?????????.

Обработка нескольких сообщений

Один класс обработчика может обрабатывать несколько сообщений. Для этого, добавьте атрибут #AsMessageHandler ко всем методам обработки:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;

class SmsNotificationHandler
{
    #[AsMessageHandler]
    public function handleSmsNotification(SmsNotification $message): void
    {
        // ...
    }

    #[AsMessageHandler]
    public function handleOtherSmsNotification(OtherSmsNotification $message): void
    {
        // ...
    }
}

Транзакционные сообщения: обработка новых сообщений после завершения обработки

Обработчик сообщений может dispatch новые сообщения, одновременно обрабатывая другие, в том же или другом автобусе (если приложение имеет несколько автобусов ). Любые ошибки или исключения, которые возникают во время этого процесса, могут привести к непредвиденным последствиям, таким как:

  1. Если используется DoctrineTransactionMiddleware и развернутое сообщение вызывает исключение, то все транзакции базы данных в исходном обработчике
    откатываются.
  2. Если сообщение отправляется в другой автобус, то отправленное сообщение будет обработано, даже если какой-то код в текущем обработчике вызовет исключение.

Пример процесса RegisterUser

Рассмотрим приложение с автобусами команд и событий. Приложение отправляет команду с именем RegisterUser в автобус команд. Команда обрабатывается RegisterUserHandler, который создает объект User, сохраняет его в базе данных и развертывает сообщение UserRegistered в автобусе событий.

Существует множество обработчиков сообщения UserRegistered, один из них может отправлять приветственное письмо новому пользователю. Мы используем DoctrineTransactionMiddleware, чтобы обернуть все запросы к базе данных в одну транзакцию базы данных.

Проблема 1: Если при отправке приветственного письма вызывается исключение, то пользователь не будет создан, поскольку DoctrineTransactionMiddleware откатит транзакцию Doctrine, в которой был создан пользователь.

Проблема 2: Если при сохранении пользователя в базе данных вызывается исключение, приветственное письмо все равно будет отправлено, поскольку оно обрабатывается асинхронно.

Промежуточное ПО DispatchAfterCurrentBusMiddleware

Для многих приложений желательным поведением является только обработка сообщений, которые развертываются обработчиком, после того как этот обработчик полностью завершит свою работу. Этого можно добиться, используя DispatchAfterCurrentBusMiddleware и добавляя штамп DispatchAfterCurrentBusStamp к конверту сообщения :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// src/Messenger/CommandHandler/RegisterUserHandler.php
namespace App\Messenger\CommandHandler;

use App\Entity\User;
use App\Messenger\Command\RegisterUser;
use App\Messenger\Event\UserRegistered;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;

class RegisterUserHandler
{
    public function __construct(
        private MessageBusInterface $eventBus,
        private EntityManagerInterface $em,
    ) {
    }

    public function __invoke(RegisterUser $command): void
    {
        $user = new User($command->getUuid(), $command->getName(), $command->getEmail());
        $this->em->persist($user);

        // DispatchAfterCurrentBusStamp помечает сообщение события для обработки
        // только если этот обработчик не вызывает исключение.

        $event = new UserRegistered($command->getUuid());
        $this->eventBus->dispatch(
            (new Envelope($event))
                ->with(new DispatchAfterCurrentBusStamp())
        );

        // ...
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// src/Messenger/EventSubscriber/WhenUserRegisteredThenSendWelcomeEmail.php
namespace App\Messenger\EventSubscriber;

use App\Entity\User;
use App\Messenger\Event\UserRegistered;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Mailer\MailerInterface;
use Symfony\Component\Mime\RawMessage;

class WhenUserRegisteredThenSendWelcomeEmail
{
    public function __construct(
        private MailerInterface $mailer,
        EntityManagerInterface $em,
    ) {
    }

    public function __invoke(UserRegistered $event): void
    {
        $user = $this->em->getRepository(User::class)->find($event->getUuid());

        $this->mailer->send(new RawMessage('Welcome '.$user->getFirstName()));
    }
}

Это означает, что сообщение UserRegistered будет обработано после того, как RegisterUserHandler завершится и новый User будет сохранен в базе данных. Если RegisterUserHandler встречает исключение, то событие UserRegistered никогда не будет обработано. А если исключение возникнет при отправке приветственного письма, транзакция Doctrine не будет откачена.

Note

Если WhenUserRegisteredThenSendWelcomeEmail вызывает исключение, то оно
будет обернуто в DelayedMessageHandlingException. Использование DelayedMessageHandlingException::getWrappedExceptions даст вам все
исключения, возникающие при обработке сообщения с помощью DispatchAfterCurrentBusStamp.

Промежуточное ПО dispatch_after_current_bus включено по умолчанию. Если вы конфигурируете свое промежуточное ПО вручную, не забудьте зарегистрировать dispatch_after_current_bus перед doctrine_transaction в цепочке промежуточного ПО. Кроме того, промежуточное ПО dispatch_after_current_bus должно быть загружено для всех используемых автобусов.

Связывание обработчиков с разными транспортами

Каждое сообщение может иметь несколько обработчиков, и когда сообщение потребляется, вызываются все его обработчики. Но вы можете также сконфигурироать обработчика так, чтобы он вызывался только когда сообщение получено из конкретного транспорта. Это позволяет вам имет одно сообщение, где каждый обработчик вызывается разными "работниками", потребляющими разный транспорт.

Представьте, что у вас есть сообщение UploadedImage с двумя обработчиками:

  • ThumbnailUploadedImageHandler: вы хотите, чтобы это обрабатывалось транспортом под названием image_transport
  • NotifyAboutNewUploadedImageHandler: вы хотите, чтобы это обрабатывалось транспортом под названием async_priority_normal

Чтобы сделать это, добавьте опцию from_transport к каждому обработчику. Например:

1
2
3
4
5
6
7
8
9
10
11
12
13
// src/MessageHandler/ThumbnailUploadedImageHandler.php
namespace App\MessageHandler;

use App\Message\UploadedImage;

#[AsMessageHandler(fromTransport: 'image_transport')]
class ThumbnailUploadedImageHandler
{
    public function __invoke(UploadedImage $uploadedImage): void
    {
        // создать миниатюры
    }
}

И, похожим образом:

1
2
3
4
5
6
7
8
// src/MessageHandler/NotifyAboutNewUploadedImageHandler.php
// ...

#[AsMessageHandler(fromTransport: 'async_priority_normal')]
class NotifyAboutNewUploadedImageHandler
{
    // ...
}

Затем, убедитесь, что "маршрутизируете" ваше сообщение к обоим транспортам:

1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_normal: # ...
            image_transport: # ...

        routing:
            # ...
            'App\Message\UploadedImage': [image_transport, async_priority_normal]

Вот и все! Теперь вы можете потреблять каждый транспорт:

1
2
3
4
# вызовет ThumbnailUploadedImageHandler только при обработке сообщения
$ php bin/console messenger:consume image_transport -vv

$ php bin/console messenger:consume async_priority_normal -vv

Caution

Если обработчик не имеет конфигурации from_transport, он будет выполнен в каждом транспорте, из которого будет получено это сообщение.

Обработка сообщений партиями

Вы можете объявить "специальные" обработчики, которые будут обрабатывать сообщения партиями.
При этом обработчик будет ждать, пока не наберется определенное количество сообщений перед обработкой. Объявление обработчика партиями выполняется путём реализации BatchHandlerInterface.
BatchHandlerTrait также предоставляется, чтобы облегчить объявление этих специальных обработчиков:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
use Symfony\Component\Messenger\Handler\Acknowledger;
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;

class MyBatchHandler implements BatchHandlerInterface
{
    use BatchHandlerTrait;

    public function __invoke(MyMessage $message, Acknowledger $ack = null): mixed
    {
        return $this->handle($message, $ack);
    }

    private function process(array $jobs): void
    {
        foreach ($jobs as [$message, $ack]) {
            try {
                // Вычислить $result из $message...

                // Признать обработку сообщения
                $ack->ack($result);
            } catch (\Throwable $e) {
                $ack->nack($e);
            }
        }
    }

    // Опционально, вы можете либо переопределить метод `shouldFlush()`
    // черты, чтобы определить свой собственный размер партии...
    private function shouldFlush(): bool
    {
        return 100 <= \count($this->jobs);
    }

    // ... либо переопределить метод `getBatchSize()`, если поведение по умолчанию
    // соответствует вашим потребностям
    private function getBatchSize(): int
    {
        return 100;
    }
}

Note

Если аргумент $ack в __invoke() равен null, ожидается, что сообщение будет обработано синхронно. В противном случае ожидается, что __invoke() вернет количество ожидающих сообщений.
BatchHandlerTrait обрабатывает это за вас.

Note

По умолчанию отложенные партии сбрасываются, когда работник простаивает, а также когда он остановлен.

Расширение Messenger

Конверты и марки

Сообщение может быть любым PHP-объектом. Иногда вам может понадобиться сконфигурировать что-то дополнительное в сообщении - вроде того, как оно должно быть обработано внутри AMQP или добавления задержки перед обработкой сообщения. Вы можете сделать это, добавив марку ("stamp") к вашему сообщению:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;

public function index(MessageBusInterface $bus): void
{
    $bus->dispatch(new SmsNotification('...'), [
        // подождать 5 секунд перед обработкой
        new DelayStamp(5000),
    ]);

    // или ясно создайте Конверт
    $bus->dispatch(new Envelope(new SmsNotification('...'), [
        new DelayStamp(5000),
    ]));

    // ...
}

Внутренне, каждое сообщение оборачивается в конверт (Envelope), содержащий сообщение и марки. Вы можете создать его вручную или позволить автобусу сообщений сделать это. Существует множество различных марок для разных целей и они используются внутренне для отслеживания информации о сообщении - вроде того, какой автобус обрабатывает его или имеет ли оно повторные попытки после неудачи.

Промежуточное ПО

То, что происходит после запуска сообщения в автобус сообщений, зависит от его набора промежуточного ПО и его порядка. По умолчанию, промежуточное ПО, сконфигурированное для каждого автобуса, выглядит так:

  1. add_bus_name_stamp_middleware - добавляет марку для записи того, в каком атобусе было запущено это собщение;
  2. dispatch_after_current_bus- см. Транзакционные сообщение: обрабатывайте сообщения после того, как обработка закончена;
  3. failed_message_processing_middleware - обрабатывает сообщения, которые имеют повторные попытки через транспорт ошибок , чтобы они правильно функционировали, как будто бы они были получены из изначального транспорта;
  4. Ваша собственная коллекция middleware;
  5. send_message - если машрутизация сконфигурирована для транспорта, отправляет сообщения этому транспорту и останавливает цепь промежуточного ПО;
  6. handle_message - вызывает обработчика(ов) сообщений для заданног сообщения.

Note

Эти названия промежуточного ПО - на самом деле сокращения. Настоящие id сервисов имеют префикс messenger.middleware. (например,messenger.middleware.handle_message).

Промежуточное ПО выполняется после запуска сообщения, и также еще раз, когда сообщение получено через работника (для сообщений, которые были отправлены транспорту для асинхронной обработки). Помните это, если вы создаете собственное промежуточное ПО.

Вы можете добавить собственное промежуточное ПО в список, или полностью отключить промежуточное ПО по умолчанию и добавить только ваше собственное:

Если сервис промежуточного ПО является абстрактным, вы можете сконфигурировать аргументы его конструктора и для каждого автобуса будет создан свой экземпляр.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            messenger.bus.default:
                # отключить промежуточное ПО по умолчанию
                default_middleware: false

                middleware:
                    # использовать и сконфигурировать части промежуточного ПО по умолчанию, если вы хотите
                    - 'add_bus_name_stamp_middleware': ['messenger.bus.default']

                    # добавить ваши собственные сервисы, которые реализуют Symfony\Component\Messenger\Middleware\MiddlewareInterface
                    - 'App\Middleware\MyMiddleware'
                    - 'App\Middleware\AnotherMiddleware'

Промежуточное ПО для Doctrine

Если вы в своем приложении используете Doctrine, существует ряд необязательного промежуточного ПО, которое вы можете захотеть использовать:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            command_bus:
                middleware:
                    # каждый раз при обработке сообщения, соединение Doctrine
                    # "пингуется" и повторно подключается, если оно закрыто. Полезно,
                    # если ваши работники работают долгое время и соединение базы
                    # данных иногда теряется
                    - doctrine_ping_connection

                    # После обработки, соединение Doctrine закрывается, что может
                    # освободить соединения базы данных в работнике, вместо того,
                    # чтобы держать их открытыми всегда
                    - doctrine_close_connection

                    # оборачивает всех обработчиков в одну транзакцию Doctrine
                    # обработчикам не надо вызывать flush(), а ошибка в любом
                    # обработчике вызовет откат
                    - doctrine_transaction

                    # or pass a different entity manager to any
                    #- doctrine_transaction: ['custom']

Другое промежуточное ПО

Добавьте промежуточное ПО router_context, если вам нужно генерировать абсолютные URL в потребителе (например, отображать шаблон со ссылками). Это промежуточное ПО хранит контекст изначального запроса (т.е. хост, HTTP-порт и т.д.), что необходимо при создании абсолютных URL.

Добавьте промежуточное ПО validation, если вам нужно валидировать объект сообщения, используя компонент Validator, перед его обработкой. Если валидация будет неуспешной, будет вызвано ValidationFailedException. ValidationStamp может быть использован для конфигурации групп валидации.

1
2
3
4
5
6
7
8
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            command_bus:
                middleware:
                    - router_context
                    - validation

События Messenger

В дополнение к промежуточному ПО, Messenger также запускает несколько событий. Вы можете создать слушателя событий, чтобы подключаться к разным частям процесса. Для каждого, класс события будет названием события:

Дополнительные аргументы обработчика

Можно заставить мессенджер передавать обработчику сообщений дополнительные данные,
используя HandlerArgumentsStamp. Добавьте этот штамп в конверт в промежуточном ПО и заполните его любыми дополнительными данными, которые вы хотите иметь в обработчике:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// src/Messenger/AdditionalArgumentMiddleware.php
namespace App\Messenger;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\HandlerArgumentsStamp;

final class AdditionalArgumentMiddleware implements MiddlewareInterface
{
    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        $envelope = $envelope->with(new HandlerArgumentsStamp([
            $this->resolveAdditionalArgument($envelope->getMessage()),
        ]));

        return $stack->next()->handle($envelope, $stack);
    }

    private function resolveAdditionalArgument(object $message): mixed
    {
        // ...
    }
}

Затем ваш обработчик будет выглядеть так:

1
2
3
4
5
6
7
8
9
10
11
12
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\SmsNotification;

final class SmsNotificationHandler
{
    public function __invoke(SmsNotification $message, mixed $additionalArgument)
    {
        // ...
    }
}

Сериализатор сообщений для пользовательских форматов данных

Если вы получаете сообщения из других приложений, возможно, они не
не совсем в том формате, который вам нужен. Не все приложения будут возвращать сообщения в формате JSON с полями body и headers. В таких случаях вам придется создать новый сериализатор сообщений, реализующий SerializerInterface. Допустим, вы хотите создать расшифровщик сообщений:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
namespace App\Messenger\Serializer;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

class MessageWithTokenDecoder implements SerializerInterface
{
    public function decode(array $encodedEnvelope): Envelope
    {
        $envelope = \json_decode($encodedEnvelope, true);

        try {
            // проанализировать данные, которые вы получили с вашими пользовательскими полями
            $data = $envelope['data'];
            $data['token'] = $envelope['token'];

            // другие операции, вроде получения информации из штампов
        } catch (\Throwable $throwable) {
            // обернуть любое исключение, которое может возникнуть в конверте, чтобы отправить его в транспорт неудач
            return new Envelope($throwable);
        }

        return new Envelope($data);
    }

    public function encode(Envelope $envelope): array
    {
        // этот дешифровщик не шифрует сообщения, но вы можете реализовать его, вернув
        // массив с сериализованными штампами, если вам нужно отправлять сообщения в пользовательском формате
        throw new \LogicException('This serializer is only used for decoding messages.');
    }
}

Следующий шаг - сообщить Symfony, чтобы она использовала этот сериализатор в одном или нескольких ваших
транспортах:

1
2
3
4
5
6
7
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            my_transport:
                dsn: '%env(MY_TRANSPORT_DSN)%'
                serializer: 'App\Messenger\Serializer\MessageWithTokenDecoder'

Несколько автобусов, автобусов команд и событий

Messenger предоставляет вам один сервис автобуса сообщений по умолчанию. Но вы можете сконфигурировать столько, сколько вы хотите, создав автобусы "команд", "запросов" или "событий" и контролируя их промежуточное ПО.

Общепринятая архитектура при создании приложений заключается в разделении команд и
запросов. Команды - это действия, которые что-то делают, а запросы получают данные. Это называется CQRS ("Разделение ответственности команд и запросов"). См. статью о CQRS, чтобы узнать больше. Эту архитектуру можно использовать вместе с компонентом Messenger, определив несколько автобусов.

Автобускоманд немного отличается от автобуса запросов. Например, автобусы команд обычно не предоставляют никаких результатов, а автобусы запросов редко бывают асинхронными. Вы можете сконфигурировать эти автобусы и их правила с помощью промежуточного ПО.

Также может быть хорошей идеей отделить действия от реакций, представив автобус событий. У автобуса событий может быть ноль или более подписчиков.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
framework:
    messenger:
        # Автобус, который будет внедрен при внедрении MessageBusInterface
        default_bus: command.bus
        buses:
            command.bus:
                middleware:
                    - validation
                    - doctrine_transaction
            query.bus:
                middleware:
                    - validation
            event.bus:
                default_middleware:
                    enabled: true
                    # установить "allow_no_handlers" как true (по умолчанию false), чтобы позволить
                    # отсутствие сконфигурированного обработчика для этого автобуса, без вызова исключения
                    allow_no_handlers: false
                    # установить "allow_no_senders" как false (по умолчанию true), чтобы вызвать исключение,
                    # если для этого автобуса не сконфигурирован отправитель
                    allow_no_senders: true
                middleware:
                    - validation

Это создаст три новых сервиса:

  • command.bus: автомонтируемый с подсказкой типа MessageBusInterface (потому что это default_bus);
  • query.bus: автомонтируемый с MessageBusInterface $queryBus;
  • event.bus: автопмонтируемый с MessageBusInterface $eventBus.

Ограничение количества обработчиков в автобусе

По умолчанию каждый обработчик будет доступен для обработки сообщений во всех ваших автобусах. Чтобы предотвратить отправку сообщения в неправильный автобус без ошибки, вы можете ограничить каждый обработчик определенным автобусом с помощью тега messenger.message_handler:

1
2
3
4
# config/services.yaml
services:
    App\MessageHandler\SomeCommandHandler:
        tags: [{ name: messenger.message_handler, bus: command.bus }]

Таким образом, обработчик App\MessageHandler\SomeCommandHandler будет известен только автобусу command.bus.

Вы также можете автоматически добавить этот тег к ряду классов, используя конфигурацию сервиса _instanceof <di-instanceof>`. Используя это, вы можете определить автобус сообщений на основе реализованного интерфейса:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# config/services.yaml
services:
    # ...

    _instanceof:
        # все сервисы, реализующие CommandHandlerInterface,
        # будут зарегистрированы в автобусе command.bus
        App\MessageHandler\CommandHandlerInterface:
            tags:
                - { name: messenger.message_handler, bus: command.bus }

        # в то время, как реализующие QueryHandlerInterface, будут
        # зарегистрированы в автобусе query.bus
        App\MessageHandler\QueryHandlerInterface:
            tags:
                - { name: messenger.message_handler, bus: query.bus }

Отладка автобусов

Команда debug:messenger выводит список доступных сообщений и обработчиков для каждого автобуса.
Вы также можете ограничить список конкретным автобусом, указав его имя в качестве аргумента.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
$ php bin/console debug:messenger

  Messenger
  =========

  command.bus
  -----------

   Следующие сообщения могут быть развернуты:

   ---------------------------------------------------------------------------------------
    App\Message\DummyCommand
        handled by App\MessageHandler\DummyCommandHandler
    App\Message\MultipleBusesMessage
        handled by App\MessageHandler\MultipleBusesMessageHandler
   ---------------------------------------------------------------------------------------

  query.bus
  ---------

   Следующие сообщения могут быть развернуты:

   ---------------------------------------------------------------------------------------
    App\Message\DummyQuery
        handled by App\MessageHandler\DummyQueryHandler
    App\Message\MultipleBusesMessage
        handled by App\MessageHandler\MultipleBusesMessageHandler
   ---------------------------------------------------------------------------------------

Tip

Команда также отобразит PHPDoc-описание классов сообщений и обработчиков.

Повторное развертывание сообщения

Если вы хотите повторно развернуть сообщение (используя тот же транспорт и конверт), создайте новый RedispatchMessage и разверните его его через ваш автобус. Используя тот же пример SmsNotification, показанный ранее:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Message\RedispatchMessage;
use Symfony\Component\Messenger\MessageBusInterface;

#[AsMessageHandler]
class SmsNotificationHandler
{
    public function __construct(private MessageBusInterface $bus)
    {
    }

    public function __invoke(SmsNotification $message): void
    {
        // сделать что-то с сообщением
        // затем повторно развернуть его, основываясь на вашей логике

        if ($needsRedispatch) {
            $this->bus->dispatch(new RedispatchMessage($message));
        }
    }
}

Встроенный RedispatchMessageHandler позаботится об этом сообщении, чтобы повторно развернуть его через тот же автобус, через который оно было развернуто впервые. Вы также можете использовать второй аргумент конструктора RedispatchMessage, чтобы указать транспорты, которые будут использоваться при повторном развертывании сообщения.