Messenger: работа с синхронизированными сообщениями и сообщениями в очереди

Дата обновления перевода 2023-01-16

Messenger: работа с синхронизированными сообщениями и сообщениями в очереди

Messenger предоставляет автобус сообщений с возможностью отправки сообщений, а затем работы с ними сразу же в вашем приложении, или их отправки через транспорт (например, очередь) для их обработки позже. Чтобы узнать об этом больше, прочтите документацию компонента Messenger.

Установка

В приложениях, использующих Symfony Flex , выполните эту команду, чтобы установить Messenger:

1
$ composer require symfony/messenger

Создание сообщения и обработчика

Messenger строится на двух разных классах, которые вы создадите: (1) классе сообщения, который содержит данные, и (2) классе обработчика(ов), который будет вызван после запуска сообщения. Класс обработчика будет читать класс сообщения и выполнять некоторое действие.

Для класса сообщения нет особых требований, кроме того, что его можно сериализовать:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// src/Message/SmsNotification.php
namespace App\Message;

class SmsNotification
{
    private $content;

    public function __construct(string $content)
    {
        $this->content = $content;
    }

    public function getContent(): string
    {
        return $this->content;
    }
}

Обработчик сообщений - это PHP-вызываемое, рекомендованный способ его создания - создать класс, реализующий MessageHandlerInterface и имеющий метод __invoke(), который имеет подсказки класса сообщения (или интерфейс сообщения):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class SmsNotificationHandler
{
    public function __invoke(SmsNotification $message)
    {
        // ... сделайте что-то - вроде отправки SMS!
    }
}

Tip

Вы можете также использовать атрибут #[AsMessageHandler] в отдельных методах класса. Вы можете использовать атрибут в стольки методах одного класа, скольки хотите, что позволяет вам группировать обработку нескольких связанных типов сообщений.

6.1

Поддержка для #[AsMessageHandler] в методах была представлена в Symfony 6.1.

Благодаря автоконфигурации и подсказке SmsNotification, Symfony значет, что этот отбработчик должен быть вызван, когда запускается сообщение SmsNotification. В большинстве случаев, это все, что вам нужно будет сделать. Но вы можете также сконфигурировать обработчики сообщений вручную . Чтобы увидеть всех сконфигурированных обработчиков, выполните:

1
$ php bin/console debug:messenger

Запуск сообщения

Вы готовы! Для запуска сообщения (и вызова обработчика), внедрите сервис messenger.default_bus (через MessageBusInterface), например, в контроллер:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// src/Controller/DefaultController.php
namespace App\Controller;

use App\Message\SmsNotification;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\Messenger\MessageBusInterface;

class DefaultController extends AbstractController
{
    public function index(MessageBusInterface $bus)
    {
        // приведет к вызову SmsNotificationHandler
        $bus->dispatch(new SmsNotification('Look! I created a message!'));

        // ...
    }
}

Транспорт: асинхронные сообщения/сообщения в очереди

По умолчанию, сообщения обрабатываются сразу же после запуска. Если вы хотите обработать сообщение асинхронно, вы можете сконфигурировать транспорт. Транспорт может отправлять сообщения (например, в систему очереди) и затем получать из через работника . Messenger поддерживает несколько транспортов .

Note

Если вы хотите использовать транспорт, который не поддерживается, посмотрите на транспорт Enqueue, который поддерживает штуки вроде Kafka и Google Pub/Sub.

Транспорт регистрируется с использованием "DSN". Благодаря рецепту Flex для Messenger, ваш файл .env уже имеет несколько примеров.

1
2
3
# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
# MESSENGER_TRANSPORT_DSN=doctrine://default
# MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages

Раскомментируйте тот транспорт, который вы хотите (или установите его в .env.local). См. , чтобы узнать больше деталей.

Далее, в config/packages/messenger.yaml, давайте определим транспорт под название async, использующий эту конфигурацию:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async: "%env(MESSENGER_TRANSPORT_DSN)%"

            # или расширено для конфигурации большего количества опций
            #async:
            #    dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
            #    options: []

Маршрутизация сообщений к транспорту

Теперь, когда у вас есть сконфигурированный транспорт, вместо немедленно обработки сообщений, вы можете сконфигурировать их так, чтобы они были отправлены транспорту:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async: "%env(MESSENGER_TRANSPORT_DSN)%"

        routing:
            # async - это то имя, которое вы дали вашему транспорту выше
            'App\Message\SmsNotification': async

Благодаря этому, App\Message\SmsNotification будет отправлен транспорту async, и его обработчик(и) не будут вызваны сразу же. Все сообщения, не совпавшие с routing будут обработаны немедленно.

Note

Вы можете использовать '*' как класс сообщения. Он будет вести себя как правило маршрутизации по умолчанию для любого сообщения, не совпадающего с routing. Это полезно, чтобы гарантировать, что ни одно сообщение не обрабатывается синхронно по умолчанию.

Единственный недостаток в том, что '*' также будет применяться к электронным письмам, отправленным с Symfony Mailer (который использует SendEmailMessage, если доступен Messenger). Это может привести к проблемам, если ваши письма не поддаются сериализации (например, если они содержат вложения файлов как PHP источники/потоки).

Вы также можете маршрутизировать классы по их родительскому классу или интерфейсу. Или отправлять сообщения нескольким транспортам:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
# config/packages/messenger.yaml
framework:
    messenger:
        routing:
            # маршрутизируйте все сообщения, расширяющие этот пример базового класса или интерфейс
            'App\Message\AbstractAsyncMessage': async
            'App\Message\AsyncMessageInterface': async

            'My\Message\ToBeSentToTwoSenders': [async, audit]

Note

Если вы сконфигурируете машрутизацию и для дочернего, и для родительского класса, используются оба правила. Например, если у вас есть объект SmsNotification, расширяющийся из Notification, будут использованы маршрутизации и для Notification, и для SmsNotification.

Сущности Doctrine в сообщениях

Если вам нужно передать сущность в Doctrine в сообщении, лучше всего передать основной ключ сущности (или любую релевантную информацию, необходимую обработчику, вроде email, и др.) вместо объекта:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// src/Message/NewUserWelcomeEmail.php
namespace App\Message;

class NewUserWelcomeEmail
{
    private $userId;

    public function __construct(int $userId)
    {
        $this->userId = $userId;
    }

    public function getUserId(): int
    {
        return $this->userId;
    }
}

Затем, в вашем обработчике, вы можете сделать запрос свежего объекта:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// src/MessageHandler/NewUserWelcomeEmailHandler.php
namespace App\MessageHandler;

use App\Message\NewUserWelcomeEmail;
use App\Repository\UserRepository;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class NewUserWelcomeEmailHandler
{
    private $userRepository;

    public function __construct(UserRepository $userRepository)
    {
        $this->userRepository = $userRepository;
    }

    public function __invoke(NewUserWelcomeEmail $welcomeEmail)
    {
        $user = $this->userRepository->find($welcomeEmail->getUserId());

        // ... отправить электронное письмо!
    }
}

Это гарантирует, что сущность содержит свежие данные.

Синхронная обработка сообщений

Если сообщение не совпадает ни с одним правилом маршрутизации , оно не будет отправлено ни одному транспорту, и будет обработано немедленно. В некоторых случаях (например, при связывании обработчиков с разными транспортами), легче и более гибко обработать их ясно: создав транспорт sync и "отправив" сообщения в него для немедленной обработки:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            # ... другие транспорты

            sync: 'sync://'

        routing:
            App\Message\SmsNotification: sync

Создание вашего собственного транспорта

Вы также можете создать собственный транспорт, если вам нужно отправлять или получать сообщения из чего-то, что не поддерживается. См. Как создать ваш собственный транспорт сообщений.

Потребление сообщений (запуск работника)

Когда ваши сообщения были маршрутизированы, в большинстве случаев, вам нужно будет "потребить" их. Вы можете сделать это с помощью команды messenger:consume:

1
2
3
4
$ php bin/console messenger:consume async

# используйте -vv, чтобы увидеть детали того, что происходит
$ php bin/console messenger:consume async -vv

Первый аргумент - это имя получателя (или id сервиса, если вы маршрутизировали к пользовательскому сервису). По умолчанию, команда будет выполняться бесконечно: искать новые сообщения в вашем транспорте и обрабатывать их. Эта команда называется вашим "работником".

Tip

Чтобы правильно остановить работника, вызовите экземпляр StopWorkerException.

Развёртывание в производство

В производстве, есть несколько важных вещей, о которых стоит подумать:

Используйте Супервизора, чтобы ваш(и) работник(и) работали
Вы хотите, чтобы один или более "работников" работали все время. Чтобы сделать это, используйте систему контроля процесса вроде Супервизора .
Не позволяйте работникам работать бесконечно
Некоторые сервисы (вроде EntityManager Doctrine) будут потреблять все больше памяти со временем. Поэтому, вместо того, чтобы позволять вашему работнику работать всегда, используйте флажок вроде messenger:consume --limit=10, чтобы указать работнику, что он должен обработать только 10 сообщений до прекращения работы (затем Супервизор создаст новый процесс). Также есть другие опции вроде --memory-limit=128M и --time-limit=3600.
Остановка работников, которые столкнулись с ошибками
Если зависимость работника, вроде вашего сервера БД не отвечает, или достигнут тайм-аут, вы можете попробовать добавить логику повторного соединения , или просто остановить работника, если он получает слишком много ошибок, с помощью опции --failure-limit команды messenger:consume.
Перезагружайте работников при развёртывании
Каждый раз при развёртывании вам нужно будет перезагрузить все процессы ваших работников, чтобы они видели новозапущенный код. Чтобы сделать это, выполните messenger:stop-workers при запуске. Это сигнализирует каждому работнику, что он должен закончить обработку текущего сообщения и грациозно завершить работу. Затем, Супервизор создаст новые процессы работников. Команда использует кеш app внутренне - поэтому убедитесь в том, что он сконфигурирован для использования адаптера по вашему вкусу.
Используйте один кеш между запусками
Если ваша стратегия развёртывания заключается в создании новых целевых каталогов, вам нужно установить значение опции конфигурации cache.prefix.seed , чтобы использовать одно и то же пространство имен кеша между запусками. Иначе, пул cache.app будет использовать значение параметра kernel.project_dir в качестве базы для пространства имен, что приведет к разным пространствам имен каждый раз при запуске.

Приоритезированный транспорт

Иногда определенные типы сообщений должны иметь более высокий приоритет и быть обработаны до других. Чтобы сделать это возможным, вы можете создать несколько транспортов и маршрутизировать разные сообщения к ним. Например:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    # queue_name соответствует транспорту doctrine
                    queue_name: high

                    # для AMQP отправьте отдельный обмен, а затем поставьте в очередь
                    #exchange:
                    #    name: high
                    #queues:
                    #    messages_high: ~
                    # or redis try "group"
            async_priority_low:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: low

        routing:
            'App\Message\SmsNotification':  async_priority_low
            'App\Message\NewUserWelcomeEmail':  async_priority_high

Затем вы можете запускать отдельных работников для каждого транспорта, или инструктировать одного работника, чтобы он обрабатывал сообщения в порядке приоритетности:

1
$ php bin/console messenger:consume async_priority_high async_priority_low

Работник будет всегда вначале искать сообщения, ожидающие async_priority_high. Если таких нет, затем он будет потреблять сообщения из async_priority_low.

Ограничьте потребление для конкретных очередей

Некоторый транспорт (особенно AMQP) имеет концепцию обмена и очередей. Транспорт Symfony всегда связан с обменом. По умолчанию, работник потребляет из всех очередей, соединенных с обменом указанного транспорта. Однако, есть примеры использования, когда вам нужно, чтобы работник потреблял только из конкретной очереди.

You can limit the worker to only process messages from specific queues:

1
$ php bin/console messenger:consume my_transport --queues=fasttrack

Чтобы позволить использование опции queues, получатель должен реализовывать QueueReceiverInterface.

Проверка количества сообщений в очереди для транспорта

Выполните команду messenger:stats, чтобы узнать, сколько сообщений находятся в "очередяз" некоторого или всех транспортов:

1
2
3
4
5
# отображает количество сообщений в очереди во всех транспортах
$ php bin/console messenger:stats

# отображает статистику только для некоторых транспортов
$ php bin/console messenger:stats my_transport_name other_transport_name

Note

Для того, чтобы эта команда работала, сконфигурированный получатель транспорта должен реализовывать
MessageCountAwareInterface.

6.2

Команда messenger:stats была представлена в Symfony 6.2.

Конфигурация супервизора

Супервизор - это отличный инструмент для гарантии того, что процесс(ы) ваших работников всегда производятся (даже если он закрывается в связи с ошибкой, достижением лимита сообщений или благодаря messenger:stop-workers). Вы можете установить его на Ubuntu, к примеру, через:

1
$ sudo apt-get install supervisor

Файлы конфигурации Супервизора обычно живут в каталоге /etc/supervisor/conf.d. Например, вы можете создать там новый файл messenger-worker.conf, чтобы убедиться, что 2 экземпляра messenger:consume работают всегда:

1
2
3
4
5
6
7
8
9
;/etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
user=ubuntu
numprocs=2
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d

Измените аргумент async, чтобы он использовал название вашего транспорта (или транспортов) и user на Unix-пользователя на вашем сервере.

Caution

Во время развёртывания, что-то может быть не доступно (например, БД), что приведет к ошибке запуска потребителя. В такой ситуации, Supervisor попробует startretries несколько раз, чтобы перезапустить команду. Убедитесь в том, что изменили эту настройку, чтобы избежать получения команды в ФАТАЛЬНОМ состоянии, после которого её невозможно будет перезапустить.

Каждый перезапуск, Supervisor увеличивает задержку на 1 секунду. Например, если значение - 10, он подождёт 1 сек, 2 сек, 3 сек и т.д. Это даёт сервису в общей сложности 55 секунд, чтобы снова стать доступным. Увеличьте настройку startretries, чтобы покрыть максимум ожидаемого времени простоя.

Если вы используете транспорт Redis, заметьте, что каждому работнику нужно уникальное имя потребителя, чтобы избежать обработки одного сообщения несколькими работникам. Один из способов достичь этого - установить переменную окружения в файле конфигурации Супервизора, на которую вы потом можете сослаться в messenger.yaml (см. раздел Redis выше):

1
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d

Далее, укажите Supervisor прочитать вашу конфигурацию и запустить ваших работников:

1
2
3
4
5
$ sudo supervisorctl reread

$ sudo supervisorctl update

$ sudo supervisorctl start messenger-consume:*

См. документацию Supervisor, чтобы узнать больше деталей.

Грациозное завершение работы

Если вы установили в своем проекте PHP-расширение PCNTL, работники будут обрабатывать POSIX сигнал SIGTERM для окончания обработки текущего сообщения перед выходом.

В некоторых случаях, сигнал SIGTERM отправляется самим Супервизором (например, остановка контейнера Docker в котором Супервизор - точка входа). В таких случаях, вам нужно добавить ключ stopwaitsecs к конфигурации программы (со значением желаемого периода острочки в секундах) для того, чтобы выполнить грациозное выключение:

1
2
[program:x]
stopwaitsecs=20

Конфигурация Systemd

Хотя Supervisor - это отличный инструмент, он имеет недостаток в том, что вам нужен доступ к системе, чтобы его запустить. Systemd стала стандартом в большинстве дистрибуций Linux, и имеет хорошую альтернативу под названием сервисы пользователя.

Файлы конфигурации сервисов пользователя Systemd обычно живут в каталоге ~/.config/systemd/user. Например, вы можете создать новый файл messenger-worker.service. Или файл messenger-worker@.service, если вы хотите, чтобы больше экземпляров работали одновременно:

1
2
3
4
5
6
7
8
9
10
[Unit]
Description=Symfony messenger-consume %i

[Service]
ExecStart=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
Restart=always
RestartSec=30

[Install]
WantedBy=default.target

Теперь, укажите systemd включить и запустить одного работника:

1
2
3
4
5
6
$ systemctl --user enable messenger-worker@1.service
$ systemctl --user start messenger-worker@1.service

# to enable and start 20 workers
$ systemctl --user enable messenger-worker@{1..20}.service
$ systemctl --user start messenger-worker@{1..20}.service

Если вы измените ваш файл конфигурации сервиса, вам нужно перезагрузить daemon:

1
$ systemctl --user daemon-reload

Чтобы перезапустить всех ваших потребителей:

1
$ systemctl --user restart messenger-consume@*.service

Экземпляр пользователя systemd запускается только после первого входа в систему конкретным пользователем. Потребителю часто нужно запускаться при начальной загрузке системы. Включите lingering в пользователе, чтобы активировать это поведение:

1
$ loginctl enable-linger <your-username>

Логами управляет journald и с ними можно работать, используя команду journalctl:

1
2
3
4
5
6
7
8
# следить за логами потребителя номер 11
$ journalctl -f --user-unit messenger-consume@11.service

# следить за логами всех потребителей
$ journalctl -f --user-unit messenger-consume@*

# следить за всеми логами из ваших сервисов пользователя
$ journalctl -f _UID=$UID

См. документацию systemd, чтобы узнать больше.

Note

Вам нужно либо иметь повышенные привелегии для команды journalctl, либо добавить вашего пользователя в группу systemd-journal:

1
$ sudo usermod -a -G systemd-journal <your-username>

Работник без состояния

PHP создан быть без состояния, разные запросы не имеют общих источников. В HTTP контексте PHP очищает все перед отправкой ответа, поэтому вы можете решить не заботиться о сервисах, которые могут допускать утечку памяти.

С другой стороны, работники обычно работают в долгосрочных CLI-процессах, которые не заканчиваются после обработки сообщения. Поэтому вам нужно быть осторожными с состояниями сервисов, чтобы они не давали утечку информации и/или памяти из одного сообщения в другое.

Однако, некоторые сервисы Symfony, вроде обработчика fingers crossed Monolog, допускают утечку по своей задумке. Symfony предоставляет функцию перезапуска сервиса, чтобы решить эту проблему. При перезапуске контейнера автоматически между двумя сообщениями, Symfony ищет любые сервисы, реализующие ResetInterface (включая ваши собственные сервисы) и вызывает их метод reset(), чтобы они могли очистить своё внутренее состояние.

Если сервис не без состояния, и вы хотите перезапускать его свойства после каждого сообщения, то сервис должен реализовывать ResetInterface, где вы можете перезапустить свойства в методе reset().

Если вы не хотите перезапускать контейнер, добавьте опцию --no-reset при выполнении команды messenger:consume.

6.1

В версиях Symfony до 6.1, сервис-контейнер не перезапускался автоматически между сообщениями, и вам нужно было устанавливать опцию framework.messenger.reset_on_message как true.

Повторные попытки и ошибки

Если во время потребления сообщения из транспорта будет вызвано исключение, оно будет автоматически повторно отправлено транспорту, чтобы попробовать снова. По умолчанию, сообщение имеет 3 попытки перед сбросом или отправкой транспорту ошибок . Каждая повторная попытка будет тажке отложена во времени, на случай, если она была вызвана временной проблемой. Все это можно сконфигурировать для каждого транспорта:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'

                # конфигурация по умолчанию
                retry_strategy:
                    max_retries: 3
                    # задержка в милисекундах
                    delay: 1000
                    # делает так, чтобы задержка была дольше перед каждой повторной попыткой
                    # например, задержка в 1 секунду, 2 секунду, 4 секунду
                    multiplier: 2
                    max_delay: 0
                    # переопределить это все сервисом, который
                    # реализует Symfony\Component\Messenger\Retry\RetryStrategyInterface
                    # service: null

Tip

Symfony запускает WorkerMessageRetriedEvent, когда сообщение имеет повторные попытки, поэтому вы можете выполнить собственную логику.

Note

Благодаря SerializedMessageStamp, сериализованная форма сообщения сохраняется, что предупреждает его повторную сериализацию, если позже будет ещё одна попытка сообщения.

6.1

Класс SerializedMessageStamp был представлен в Symfony 6.1.

Избегание повторных попыток

Иногда обработка сообщения может быть неудачной, и вы будете знать, что это перманентно и повторных попыток не нужно. Если вы вызовете UnrecoverableMessageHandlingException, сообщение не будет иметь повторных попыток.

Форсирование повторных попыток

Иногда обработка сообщения может быть неудачной, и вы будете знать, что это временно и необходима повторная попытка. Если вы вызовете RecoverableMessageHandlingException, сообщение всегда будет иметь повторную попытку.

Сохранения и повторные попытки неудачных сообщений

Если сообщение терпит неудачу и имеет несколько повторных попыток (max_retries), оно затем будет отбраковано. Чтобы избежать этого, вы можете сконфигурировать failure_transport:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        # после повторных попыток, сообщения будут отправлены транспорту "failed"
        failure_transport: failed

        transports:
            # ... другой транспорт

            failed: 'doctrine://default?queue_name=failed'

В этом примере, если обработка сообщения терпит неудачу 3 раза (значение по умолчанию max_retries), оно затем будет отправлено транспорту failed. Хотя вы можете использовать messenger:consume failed для потребления его, как обычного транспорта, вам скорее захочется вручную просмотреть сообщения в транспорте ошибок и решить об их повторных попытках:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# увидеть все сообщения в транспорте ошибок
$ php bin/console messenger:failed:show

# увидеть первых 10 сообщений
$ php bin/console messenger:failed:show --max=10

# увидеть только сообщения MyClass
$ php bin/console messenger:failed:show --class-filter='MyClass'

# увидеть количество сообщений по классу сообщения
$ php bin/console messenger:failed:show --stats

# увидеть детали конкретной ошибки
$ php bin/console messenger:failed:show 20 -vv

# увидеть и повторно попробовать каждое сообщение по-отдельности
$ php bin/console messenger:failed:retry -vv

# повторная попытка конкретных сообщений
$ php bin/console messenger:failed:retry 20 30 --force

# удалить сообщение без повторной попытки
$ php bin/console messenger:failed:remove 20

# удалить сообщения без повторных попыток и показать каждое сообщение перед удалением
$ php bin/console messenger:failed:remove 20 30 --show-messages

6.2

Опции --class-filter и --stats были представлены в Symfony 6.2.

Если сообщение опять потерпит неудачу, оно будет отправлено обратно в транспорт ошибок в соответствии с обычными правилами повторных попыток . Как только будет достигнут максимум повторных попыток, сообщение будет сброшено перманентно.

Несколько ошибочных транспортов

Иногда недостаточно иметь один глобальный сконфигурированный failed transport, котому что некоторые сообщения важнее, чем другие. В таких случаях, вы можете переопределить транспорт ошибок только для определенных транспортов:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# config/packages/messenger.yaml
framework:
    messenger:
        # после повторных попыток, сообщения будут отправлены транспорту "failed"
        # по умолчанию, если внутри транспорта не сконфигурирован "failed_transport"
        failure_transport: failed_default

        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                failure_transport: failed_high_priority

            # так как транспорт ошибок не сконфигурирован, будет использоваться установленный
            # глобальный набор "failure_transport"
            async_priority_low:
                dsn: 'doctrine://default?queue_name=async_priority_low'

            failed_default: 'doctrine://default?queue_name=failed_default'
            failed_high_priority: 'doctrine://default?queue_name=failed_high_priority'

Если нету определенного failure_transport глобально или на уровне транспорта, сообщение будет сброшено после определенного количества попыток.

Неудачные команды имеют необязательную опцию --transport, чтобы указать failure_transport, сконфигурированный на уровне транспорта.

1
2
3
4
5
6
7
8
# увидеть все сообщения в транспорте "failure_transport"
$ php bin/console messenger:failed:show --transport=failure_transport

# повторная попытка конкретных сообщений из "failure_transport"
$ php bin/console messenger:failed:retry 20 30 --transport=failure_transport --force

# удалить сообщение без повторной попытки из "failure_transport"
$ php bin/console messenger:failed:remove 20 --transport=failure_transport

Конфигурация транспорта

Messenger поддерживает несколько разных типов транспорта, каждый со своими опциями. Опции могут быть переданы транспорту через строку DSN или конфигурацию.

1
2
# .env
MESSENGER_TRANSPORT_DSN=amqp://localhost/%2f/messages?auto_setup=false
  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            my_transport:
                dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
                options:
                    auto_setup: false

Опции, определенные под options, главенствуют над определенными в DSN.

Транспорт AMQP

Транспорт AMQP использует PHP-расширение AMQP для отправки сообщений в очередь вроде RabbitMQ. Установите его, выполнив:

1
$ composer require symfony/amqp-messenger

DSN транспорта AMQP может выглядеть так:

1
2
3
4
5
# .env
MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages

# или используйте протокол AMQPS
MESSENGER_TRANSPORT_DSN=amqps://guest:guest@localhost/%2f/messages

Если вы хотите использовать AMQP, зашифрованный с помощью TLS/SSL, вы должны также предоставить CA-сертификат. Определите путь сертификата в настройке PHP.ini amqp.cacert (например, amqp.cacert = /etc/ssl/certs) или в параметре DSN cacert (например, amqps://localhost?cacert=/etc/ssl/certs/).

Порт, по умолчанию используемый AMQP, зашифрованным с помощью TLS/SSL, - 5671, но вы можете переопределить его в параметре DSN port (например, amqps://localhost?cacert=/etc/ssl/certs/&port=12345).

Note

По умолчанию, транспорт будет автоматически создавать любые необходимые обмены, очереди и связующие ключи. Это можно отключить, но некоторые функции могут работать некорректно (вроде отложенных очередей).

Note

Вы можете ограничить потребителей транспорта AMQP, чтобы они обрабатывали только сообщения из некоторых очередей какого-то обмена. См. .

Транспорт имеет другие опции, включая способы конфигурации обмена, связующие ключи очереди и т.д. Смотрите документацию Connection.

Транспорт имеет ряд опций:

6.1

Опция connection_name была представлена в Symfony 6.1.

Вы можете также сконфигурирвать настройки специально для AMQP в вашем оообщении, добавив AmqpStamp к вашему Конверту:

1
2
3
4
5
6
7
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
// ...

$attributes = [];
$bus->dispatch(new SmsNotification(), [
    new AmqpStamp('custom-routing-key', AMQP_NOPARAM, $attributes),
]);

Caution

Потребители не отображаются в панели админа, так как этот транспорт не полагается на \AmqpQueue::consume(), который блокирует. Наличие блокирующего получателя делает опции --time-limit/--memory-limit команды messenger:consume. а также команды messenger:stop-workers бесполезными, так как они полагаются на тот факт, что получатель возвращается незамедлительно, независимо от того, находит он сообщение или нет. Работник потребления отвечает за итерацию до получения сообщения для обработки и/или до того, как будет достигнуто одно из условий остановки. Таким образом, логика остановки работника может быть достигнута, если он застрял на блокирующем вызове.

Транспорт Doctrine

Транспорт Doctrine может быть использован для хранения сообщений в таблице базы данных. Установите его, запустив:

1
$ composer require symfony/doctrine-messenger

DSN транспорта Doctrine может выглядеть так:

1
2
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default

Формат doctrine://<connection_name>, в случае если у вас есть несколько соединений и вы хотите использовать какое-либо другое, чем "default". Транспорт будет автоматически создавать таблицу под названием messenger_messages.

Или, для создания таблицы самостоятельно, установите опцию auto_setup как false, и сгенерируйте миграцию .

Tip

Чтобы избежать попыток инструментов вроде Doctrine Migrations удалить эту таблицу, так как она не есть частью вашей обычной схемы, вы можете установить опцию schema_filter:

  • YAML
  • XML
  • PHP
1
2
3
4
# config/packages/doctrine.yaml
doctrine:
    dbal:
        schema_filter: '~^(?!messenger_messages)~'

Caution

Свойство datetime сообщений, хранящееся в базе данных, использует временную зону текущей системы. Это может вызвать проблемы, если несколько машин с разными конфигурациями временных зон используют одно хранилище.

Транспорт имеет такие опции:

????? ???????? ?? ?????????
table_name ???????? ??????? messenger_messages
queue_name ???????? ??????? (??????? ? ???????, ????? ???????????? ???? ??????? ??? ?????????? ???????????) default
redeliver_timeout ????-??? ????? ????????? ???????? ????????? ? ???????, ?? ?? ? ????????? "?????????" (???? ???????? ?? ?????-?? ??????? ???????????, ?????????? ???, ? ????? ?? ?????? ????? ??????????? ?????????) - ? ????????. 3600
auto_setup ?????? ?? ??????? ???? ??????? ????????????? ?? ????? ????????/?????????. true

Note

Установите redeliver_timeout в большее значение, чем длительность вашего самого медленного сообщения. Иначе, некоторые сообщения будут запускаться вторрой раз, пока первый всё ещё обрабатывается.

При использовании PostgreSQL, у вас есть доступ к следующим опциям для получения преимуществ функции LISTEN/NOTIFY. Это позволяет более производительный подход, чем поведение голосования транспорта Doctrine по умолчанию, так как PostgreSQL будет напрямую уведомлять работников, когда новое сообщение будет появляться в таблице.

????? ???????? ?? ?????????
use_notify ???????????? ?? LISTEN/NOTIFY. true
check_delayed_interval ???????? ???????? ?????????? ?????????, ? ????????????. ?????????? ??? 0, ????? ????????? ????????. 1000
get_notify_timeout ????????????????? ??????? ???????? ?????? ??? ?????? PDO::pgsqlGetNotify`, ? ????????????. 0

Транспорт Beanstalkd

Транспорт Beanstalkd отправляет сообщения прямо с рабочую очередь Beanstalkd. Установите его, выполнив:

1
$ composer require symfony/beanstalkd-messenger

DSN транспорта Beanstalkd может выглядеть так:

1
2
3
4
5
# .env
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost:11300?tube_name=foo&timeout=4&ttr=120

# Если порта нет, он по умолчанию будет 11300
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost

Транспорт имеет ряд опций:

????? ???????? ?? ?????????
tube_name ???????? ??????? default
timeout ????-??? ??????? ????????? - ? ????????. 0 (???????? ?????? ????? ?? ???? ??????? ?????, ???? ??????? TransportException)
ttr ????? ?????????? ????????? ????? ??? ??? ????????? ??? ??????? ? ??????? ?????????? - ? ????????. 90

Транспорт Redis

Транспорт Redis использует потоки для создания очереди сообщений. Этот транспорт требует PHP-расширения Redis (>=4.3) и работающего сервера Redis (^5.0). Установите его, выполнив:

$ composer require symfony/redis-messenger

DSN транспорта Redis может выглядеть так:

1
2
3
4
5
6
7
8
# .env
MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
# Полный пример DSN
MESSENGER_TRANSPORT_DSN=redis://password@localhost:6379/messages/symfony/consumer?auto_setup=true&serializer=1&stream_max_entries=0&dbindex=0&delete_after_ack=true
# Пример кластера Redis
MESSENGER_TRANSPORT_DSN=redis://host-01:6379,redis://host-02:6379,redis://host-03:6379,redis://host-04:6379
# Пример Unix-сокета
MESSENGER_TRANSPORT_DSN=redis:///var/run/redis.sock

Некоторые опции могут быть сконфигурированы через DSN или ключ options под транспортом в messenger.yaml:

6.1

Опции persistent_id, retry_interval, read_timeout, timeout и sentinel_master были представлены в Symfony 6.1.

Caution

Никогда не должно быть более одной команды messenger:consume выполняемой с одинаковой комбинацией stream, group и consumer, иначе сообщения могут быть обработаны более, чем один раз. Если вы запускаете несколько работников очерели, consumer может быть установлен как переменная окружения (вроде %env(MESSENGER_CONSUMER_NAME)%), установленная Супервизором (пример ниже) или любым другим сервисом, используемым для управления процессами работников. В окружении контейнера, HOSTNAME может быть использовано как имя потребителя, так как там только один работник на контейнер/хост. Если вы используете Kubernetes для управления контейнерами, рассмотрите использование StatefulSet для стабилизации имен.

Tip

Установите delete_after_ack как true (если у вас одна группа) или определите stream_max_entries (если вы можете предположить, какое максимальное количество записей допустимо в вашем случае), чтобы избежать утечек памяти. В другом случае. все сообщения навсегда останутся в Redis.

Транспорт в памяти

Транспорт in-memory на самом деле не доставляет сообщения. Вместо этого, он дедржит их в памяти во время запроса, что может быть полезным для тестирования. Например, если у вас есть транспорт async_priority_normal, вы можете переопределить его в окружении test, чтобы использовать этот транспорт:

  • YAML
  • XML
  • PHP
1
2
3
4
5
# config/packages/test/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_normal: 'in-memory://'

Тогда, во время тестирования, сообщения не будут отправлены реальному транспорту. Даже лучше, в тесте, вы можете проверить, чтобы только одно сообщение было отправлено во время запроса:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// tests/Controller/DefaultControllerTest.php
namespace App\Tests\Controller;

use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
use Symfony\Component\Messenger\Transport\InMemoryTransport;

class DefaultControllerTest extends WebTestCase
{
    public function testSomething()
    {
        $client = static::createClient();
        // ...

        $this->assertSame(200, $client->getResponse()->getStatusCode());

        /* @var InMemoryTransport $transport */
        $transport = self::$container->get('messenger.transport.async_priority_normal');
        $this->assertCount(1, $transport->getSent());
    }
}

Транспорт имеет ряд опций:

serialize (булево, по умолчанию: false)
Сериализовать сообщения или нет. Это полезно для тестирования дополнительного слоя, особенно когда вы используете собственный сериализатор сообщений.

Note

Все транспорты in-memory будут автоматически сброшены после каждого теста в классах тестов, расширяющих KernelTestCase или WebTestCase.

Amazon SQS

Транспорт Amazon SQS прекрасно подходит для приложения на AWS. Установите его, выполнив:

1
$ composer require symfony/amazon-sqs-messenger

DSN SQS транспорта выглядит так:

1
2
3
# .env
MESSENGER_TRANSPORT_DSN=https://sqs.eu-west-3.amazonaws.com/123456789012/messages?access_key=AKIAIOSFODNN7EXAMPLE&secret_key=j17M97ffSVoKI0briFoo9a
MESSENGER_TRANSPORT_DSN=sqs://localhost:9494/messages?sslmode=disable

Note

Транспорт автоматически создаст необходимые очерди. Это можно отключить, установив опцию auto_setup как false.

Tip

До отправки или получения сообщения, Symfony необходимо конвертировать название очереди в URL очереди AWS вызвав API GetQueueUrl в AWS. Этого дополнительного API-вызова можно избежать, предоставив DSN, которая является URL очереди.

Транспорт имеет ряд опций:

????? ???????? ?? ?????????
access_key ???? ??????? AWS  
account ????????????? AWS-???????? ???????? ??????? ??????
auto_setup ????? ?? ????????????? ???????? ??????? ?? ????? ????????/?????????. true
buffer_size ?????????? ????????? ??? ???????????????? ?????????? 9
debug ???? true. ????? ??? ???? HTTP ???????? ? ??????? (??? ?????? ?? ??????????????????) false
endpoint ?????????? URL ? SQS-??????? https://sqs.eu-west-1.amazonaws.com
poll_timeout ????? ???????? ?????? ????????? ? ???????? 0.1
queue_name ???????? ??????? messages
region ???????? AWS-??????? eu-west-1
secret_key ????????? ???? AWS  
session_token ????? ?????? AWS  
visibility_timeout ?????????? ??????, ?? ?????????? ??????? ????????? ?? ????? ??????? (Visibility Timeout) ???????????? ???????
wait_time ???????????? Long polling ? ???????? 20

6.1

Опция session_token была представлена в Symfony 6.1.

Note

Параметр wait_time определяет максимальное время ожидания для Amazon SQS до того, как сообщение будет доступно в очереди, перед отправкой ответа. Это помогает снизить стоимость использования Amazon SQS, устранив некоторое количество пустых ответов.

Параметр poll_timeout определяет время ожидания получателя до возвращения null. Он избегает блокировки других получателей от вызова.

Note

Если название очереди имеет суффикс .fifo, AWS создаст очередь FIFO. Используйте марку AmazonSqsFifoStamp, чтобы определить Message group ID и Message deduplication ID.

Очереди FIFO не поддерживают установки задержки отдельных сообдений, значение delay: 0 требуется в настройках стратегии повторных попыток.

Сериализация сообщений

Когда сообщения отправляются (и получаются) в транспорт, они сериализуются с использование нативных функций PHP serialize() и unserialize(). Вы можете изменить это глобально (или для каждого транспорта) на сервис, реализующий SerializerInterface:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
12
13
# config/packages/messenger.yaml
framework:
    messenger:
        serializer:
            default_serializer: messenger.transport.symfony_serializer
            symfony_serializer:
                format: json
                context: { }

        transports:
            async_priority_normal:
                dsn: # ...
                serializer: messenger.transport.symfony_serializer

messenger.transport.symfony_serializer - это встроенный сервис, который использует компонент Serializer и может быть сконфигурирован несколькими способами. Если вы выберете использовать сериализатор Symfony, вы сможете контролировать контекст для каждого случая отдельно через SerializerStamp (см. Конверты и марки).

Tip

При отправке/получении сообщений в/из другого транспорта, вам может понадобиться больше контроля над процессом сериализации. Использование пользовательского сериализатора предоставляет такой контроль. См. Туториал по сериализации сообщений SymfonyCasts, чтобы узнать больше.

Настройка обработчиков

Конфигурация обработчиков с использованием атрибутов

Вы можете сконфигурировать вашего обработчика, передав опции атрибуту:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler(fromTransport: 'async', priority: 10)]
class SmsNotificationHandler
{
    public function __invoke(SmsNotification $message)
    {
        // ...
    }
}

Возможные опции для конфигурации атрибута:

  • bus
  • fromTransport
  • handles
  • method
  • priority

Конфигурация обработчиков вручную

Symfony обычно будет находить и регистрировать вашего обработчика автоматически . Но вы также можете сконфигурировать его вручную - и передать ему дополнительную конфигурацию - тегировав сервис обработчика messenger.message_handler

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
# config/services.yaml
services:
    App\MessageHandler\SmsNotificationHandler:
        tags: [messenger.message_handler]

        # или сконфигурировать с опциями
        tags:
            -
                name: messenger.message_handler
                # необходимо только если невозможно угадать по подсказке
                handles: App\Message\SmsNotification

Возможные опции конфигурации с тегами:

  • bus
  • from_transport
  • handles
  • method
  • priority

Обработка нескольких сообщений

Один класс обработчика может обрабатывать несколько сообщений. Для этого, добавьте атрибут #AsMessageHandler ко всем методам обработки:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;

class SmsNotificationHandler
{
    #[AsMessageHandler]
    public function handleSmsNotification(SmsNotification $message)
    {
        // ...
    }

    #[AsMessageHandler]
    public function handleOtherSmsNotification(OtherSmsNotification $message)
    {
        // ...
    }
}

6.2

Реализация MessageSubscriberInterface - это ещё один способ обработки нескольких сообщений с одним классом обработчика. Этот интерфейс устарел в Symfony 6.2.

Связывание обработчиков с разными транспортами

Каждое сообщение может иметь несколько обработчиков, и когда сообщение потребляется, вызываются все его обработчики. Но вы можете также сконфигурироать обработчика так, чтобы он вызывался только когда сообщение получено из конкретного транспорта. Это позволяет вам имет одно сообщение, где каждый обработчик вызывается разными "работниками", потребляющими разный транспорт.

Представьте, что у вас есть сообщение UploadedImage с двумя обработчиками:

  • ThumbnailUploadedImageHandler: вы хотите, чтобы это обрабатывалось транспортом под названием image_transport
  • NotifyAboutNewUploadedImageHandler: вы хотите, чтобы это обрабатывалось транспортом под названием async_priority_normal

Чтобы сделать это, добавьте опцию from_transport к каждому обработчику. Например:

1
2
3
4
5
6
7
8
9
10
11
12
13
// src/MessageHandler/ThumbnailUploadedImageHandler.php
namespace App\MessageHandler;

use App\Message\UploadedImage;

#[AsMessageHandler(from_transport: 'image_transport')]
class ThumbnailUploadedImageHandler
{
    public function __invoke(UploadedImage $uploadedImage)
    {
        // создать миниатюры
    }
}

И, похожим образом:

1
2
3
4
5
6
7
8
// src/MessageHandler/NotifyAboutNewUploadedImageHandler.php
// ...

#[AsMessageHandler(from_transport: 'async_priority_normal')]
class NotifyAboutNewUploadedImageHandler
{
    // ...
}

Затем, убедитесь, что "маршрутизируете" ваше сообщение к обоим транспортам:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_normal: # ...
            image_transport: # ...

        routing:
            # ...
            'App\Message\UploadedImage': [image_transport, async_priority_normal]

Вот и все! Теперь вы можете потреблять каждый транспорт:

1
2
3
4
# вызовет ThumbnailUploadedImageHandler только при обработке сообщения
$ php bin/console messenger:consume image_transport -vv

$ php bin/console messenger:consume async_priority_normal -vv

Caution

Если обработчик не имеет конфигурации from_transport, он будет выполнен в каждом транспорте, из которого будет получено это сообщение.

Расширение Messenger

Конверты и марки

Сообщение может быть любым PHP-объектом. Иногда вам может понадобиться сконфигурировать что-то дополнительное в сообщении - вроде того, как оно должно быть обработано внутри AMQP или добавления задержки перед обработкой сообщения. Вы можете сделать это, добавив марку ("stamp") к вашему сообщению:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;

public function index(MessageBusInterface $bus)
{
    $bus->dispatch(new SmsNotification('...'), [
        // подождать 5 секунд перед обработкой
        new DelayStamp(5000),
    ]);

    // или ясно создайте Конверт
    $bus->dispatch(new Envelope(new SmsNotification('...'), [
        new DelayStamp(5000),
    ]));

    // ...
}

Внутренне, каждое сообщение оборачивается в конверт (Envelope), содержащий сообщение и марки. Вы можете создать его вручную или позволить автобусу сообщений сделать это. Существует множество различных марок для разных целей и они используются внутренне для отслеживания информации о сообщении - вроде того, какой автобус обрабатывает его или имеет ли оно повторные попытки после неудачи.

Промежуточное ПО

То, что происходит после запуска сообщения в автобус сообщений, зависит от его набора промежуточного ПО и его порядка. По умолчанию, промежуточное ПО, сконфигурированное для каждого автобуса, выглядит так:

  1. add_bus_name_stamp_middleware - добавляет марку для записи того, в каком атобусе было запущено это собщение;
  2. dispatch_after_current_bus- см. Транзакционные сообщение: обрабатывайте сообщения после того, как обработка закончена;
  3. failed_message_processing_middleware - обрабатывает сообщения, которые имеют повторные попытки через транспорт ошибок , чтобы они правильно функционировали, как будто бы они были получены из изначального транспорта;
  4. Ваша собственная коллекция middleware;
  5. send_message - если машрутизация сконфигурирована для транспорта, отправляет сообщения этому транспорту и останавливает цепь промежуточного ПО;
  6. handle_message - вызывает обработчика(ов) сообщений для заданног сообщения.

Note

Эти названия промежуточного ПО - на самом деле сокращения. Настоящие id сервисов имеют префикс messenger.middleware. (например,messenger.middleware.handle_message).

Промежуточное ПО выполняется после запуска сообщения, и также еще раз, когда сообщение получено через работника (для сообщений, которые были отправлены транспорту для асинхронной обработки). Помните это, если вы создаете собственное промежуточное ПО.

Вы можете добавить собственное промежуточное ПО в список, или полностью отключить промежуточное ПО по умолчанию и добавить только ваше собственное:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
12
13
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            messenger.bus.default:
                # отключить промежуточное ПО по умолчанию
                default_middleware: false

                # и/или добавить ваше собственное
                middleware:
                    # id серисов, релизующих Symfony\Component\Messenger\Middleware\MiddlewareInterface
                    - 'App\Middleware\MyMiddleware'
                    - 'App\Middleware\AnotherMiddleware'

Note

Если сервис промежуточного ПО абстрактный, будет создан другой экземпляр сервиса для каждого автобуса.

Промежуточное ПО для Doctrine

1.11

Следующее промежуточное по для Doctrine было представлено в DoctrineBundle 1.11.

Если вы в своем приложении используете Doctrine, существует ряд необязательного промежуточного ПО, которое вы можете захотеть использовать:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            command_bus:
                middleware:
                    # каждый раз при обработке сообщения, соединение Doctrine
                    # "пингуется" и повторно подключается, если оно закрыто. Полезно,
                    # если ваши работники работают долгое время и соединение базы
                    # данных иногда теряется
                    - doctrine_ping_connection

                    # После обработки, соединение Doctrine закрывается, что может
                    # освободить соединения базы данных в работнике, вместо того,
                    # чтобы держать их открытыми всегда
                    - doctrine_close_connection

                    # оборачивает всех обработчиков в одну транзакцию Doctrine
                    # обработчикам не надо вызывать flush(), а ошибка в любом
                    # обработчике вызовет откат
                    - doctrine_transaction

                    # or pass a different entity manager to any
                    #- doctrine_transaction: ['custom']

Другое промежуточное ПО

Добавьте промежуточное ПО router_context, если вам нужно генерировать абсолютные URL в потребителе (например, отображать шаблон со ссылками). Это промежуточное ПО хранит контекст изначального запроса (т.е. хост, HTTP-порт и т.д.), что необходимо при создании абсолютных URL.

Добавьте промежуточное ПО validation, если вам нужно валидировать объект сообщения, используя компонент Validator, перед его обработкой. Если валидация будет неуспешной, будет вызвано ValidationFailedException. ValidationStamp может быть использован для конфигурации групп валидации.

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            command_bus:
                middleware:
                    - router_context
                    - validation

События Messenger

В дополнение к промежуточному ПО, Messenger также запускает несколько событий. Вы можете создать слушателя событий, чтобы подключаться к разным частям процесса. Для каждого, класс события будет названием события:

Несколько автобусов, автобусов команд и событий

Messenger предоставляет вам один сервис автобуса сообщений по умолчанию. Но вы можете сконфигурировать столько, сколько вы хотите, создав автобусы "команд", "запросов" или "событий" и контролируя их промежуточное ПО. См. Несколько автобусов.