Мессенджер: работа с синхронизированными сообщениями и сообщениями в очереди

Дата обновления перевода 2021-12-24

Мессенджер: работа с синхронизированными сообщениями и сообщениями в очереди

Мессенджер предоставляет автобус сообщений с возможностью отправки сообщений, а затем работы с ними сразу же в вашем приложении, или их отправки через транспорт (например, очередь) для их обработки позже. Чтобы узнать об этом больше, прочтите документацию компонента Мессенджер.

Установка

В приложениях, использующих Symfony Flex, выполните эту команду, чтобы установить мессенджер:

1
$ composer require symfony/messenger

Создание сообщения и обработчика

Мессенджер строится на двух разных классах, которые вы создадите: (1) классе сообщения, который содержит данные, и (2) классе обработчика(ов), который будет вызван после запуска сообщения. Класс обработчика будет читать класс сообщения и выполнять некоторое действие.

Для класса сообщения нет особых требований, кроме того, что его можно сериализовать:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// src/Message/SmsNotification.php
namespace App\Message;

class SmsNotification
{
    private $content;

    public function __construct(string $content)
    {
        $this->content = $content;
    }

    public function getContent(): string
    {
        return $this->content;
    }
}

Обработчик сообщений - это PHP-вызываемое, рекомендованный способ его создания - создать класс, реализующий MessageHandlerInterface и имеющий метод __invoke(), который имеет подсказки класса сообщения (или интерфейс сообщения):

1
2
3
4
5
6
7
8
9
10
11
12
13
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\SmsNotification;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class SmsNotificationHandler implements MessageHandlerInterface
{
    public function __invoke(SmsNotification $message)
    {
        // ... сделайте что-то - вроде отправки SMS!
    }
}

Благодаря автоконфигурации и подсказке SmsNotification, Symfony значет, что этот отбработчик должен быть вызван, когда запускается сообщение SmsNotification. В большинстве случаев, это все, что вам нужно будет сделать. Но вы можете также сконфигурировать обработчики сообщений вручную. Чтобы увидеть всех сконфигурированных обработчиков, выполните:

1
$ php bin/console debug:messenger

Запуск сообщения

Вы готовы! Для запуска сообщения (и вызова обработчика), внедрите сервис messenger.default_bus (через MessageBusInterface), например, в контроллер:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// src/Controller/DefaultController.php
namespace App\Controller;

use App\Message\SmsNotification;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\Messenger\MessageBusInterface;

class DefaultController extends AbstractController
{
    public function index(MessageBusInterface $bus)
    {
        // приведет к вызову SmsNotificationHandler
        $bus->dispatch(new SmsNotification('Look! I created a message!'));

        // или используйте это сокращение
        $this->dispatchMessage(new SmsNotification('Look! I created a message!'));

        // ...
    }
}

Транспорт: асинхронные сообщения/сообщения в очереди

По умолчанию, сообщения обрабатываются сразу же после запуска. Если вы хотите обработать сообщение асинхронно, вы можете сконфигурировать транспорт. Транспорт может отправлять сообщения (например, в систему очереди) и затем получать из через работника. Мессенджер поддерживает несколько трансортов.

Note

Если вы хотите использовать транспорт, который не поддерживается, посмотрите на транспорт Enqueue, который поддерживает штуки вроде Kafka и Google Pub/Sub.

Транспорт регистрируется с использованием "DSN". Благодаря рецепту Flex для Мессенджера, ваш файл .env уже имеет несколько примеров.

1
2
3
# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
# MESSENGER_TRANSPORT_DSN=doctrine://default
# MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages

Раскомментируйте тот транспорт, который вы хотите (или установите его в .env.local). См. Мессенджер: работа с синхронизированными сообщениями и сообщениями в очереди, чтобы узнать больше деталей.

Далее, в config/packages/messenger.yaml, давайте определим транспорт под название async, использующий эту конфигурацию:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async: "%env(MESSENGER_TRANSPORT_DSN)%"

            # или расширено для конфигурации большего количества опций
            #async:
            #    dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
            #    options: []

Маршрутизация сообщений к транспорту

Теперь, когда у вас есть сконфигурированный транспорт, вместо немедленно обработки сообщений, вы можете сконфигурировать их так, чтобы они были отправлены транспорту:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async: "%env(MESSENGER_TRANSPORT_DSN)%"

        routing:
            # async - это то имя, которое вы дали вашему транспорту выше
            'App\Message\SmsNotification': async

Благодаря этому, App\Message\SmsNotification будет отправлен транспорту async, и его обработчик(и) не будут вызваны сразу же. Все сообщения, не совпавшие с routing будут обработаны немедленно.

Вы также можете маршрутизировать классы по их родительскому классу или интерфейсу. Или отправлять сообщения нескольким транспортам:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
# config/packages/messenger.yaml
framework:
    messenger:
        routing:
            # маршрутизируйте все сообщения, расширяющие этот пример базового класса или интерфейс
            'App\Message\AbstractAsyncMessage': async
            'App\Message\AsyncMessageInterface': async

            'My\Message\ToBeSentToTwoSenders': [async, audit]

Note

Если вы сконфигурируете машрутизацию и для дочернего, и для родительского класса, используются оба правила. Например, если у вас есть объект SmsNotification, расширяющийся из Notification, будут использованы маршрутизации и для Notification, и для SmsNotification.

Сущности Doctrine в сообщениях

Если вам нужно передать сущность в Doctrine в сообщении, лучше всего передать основной ключ сущности (или любую релевантную информацию, необходимую обработчику, вроде email, и др.) вместо объекта:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// src/Message/NewUserWelcomeEmail.php
namespace App\Message;

class NewUserWelcomeEmail
{
    private $userId;

    public function __construct(int $userId)
    {
        $this->userId = $userId;
    }

    public function getUserId(): int
    {
        return $this->userId;
    }
}

Затем, в вашем обработчике, вы можете сделать запрос свежего объекта:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// src/MessageHandler/NewUserWelcomeEmailHandler.php
namespace App\MessageHandler;

use App\Message\NewUserWelcomeEmail;
use App\Repository\UserRepository;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class NewUserWelcomeEmailHandler implements MessageHandlerInterface
{
    private $userRepository;

    public function __construct(UserRepository $userRepository)
    {
        $this->userRepository = $userRepository;
    }

    public function __invoke(NewUserWelcomeEmail $welcomeEmail)
    {
        $user = $this->userRepository->find($welcomeEmail->getUserId());

        // ... отправить электронное письмо!
    }
}

Это гарантирует, что сущность содержит свежие данные.

Сихнронная обработка сообщений

Если сообщение не совпадает ни с одним правилом маршрутизации, оно не будет отправлено ни одному транспорту, и будет обработано немедленно. В некоторых случаях (например, при связывании обработчиков с разными транспортами), легче и более гибко обработать их ясно: создав транспорт sync и "отправив" сообщения в него для немедленной обработки:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            # ... другие транспорты

            sync: 'sync://'

        routing:
            App\Message\SmsNotification: sync

Создание вашего собственного транспорта

Вы также можете создать собственный транспорт, если вам нужно отправлять или получать сообщения из чего-то, что не поддерживается. См. Как создать ваш собственный транспорт сообщений.

Потребление сообщений (запуск работника)

Когда ваши сообщения были маршрутизированы, в большинстве случаев, вам нужно будет "потребить" их. Вы можете сделать это с помощью команды messenger:consume:

1
2
3
4
$ php bin/console messenger:consume async

# используйте -vv, чтобы увидеть детали того, что происходит
$ php bin/console messenger:consume async -vv

Первый аргумент - это имя получателя (или id сервиса, если вы маршрутизировали к пользовательскому сервису). По умолчанию, команда будет выполняться бесконечно: искать новые сообщения в вашем транспорте и обрабатывать их. Эта команда называется вашим "работником".

Tip

Чтобы правильно остановить работника, вызовите экземпляр StopWorkerException.

5.4

Класс StopWorkerException был представлен в Symfony 5.4.

Запуск в производство

В производстве, есть несколько важных вещей, о которых стоит подумать:

Используйте Супервизора, чтобы ваш(и) работник(и) работали
Вы хотите, чтобы один или более "работников" работали все время. Чтобы сделать это, используйте систему контроля процесса вроде Супервизора.
Не позволяйте работникам работать бесконечно
Некоторые сервисы (вроде EntityManager Doctrine) будут потреблять все больше памяти со временем. Поэтому, вместо того, чтобы позволять вашему работнику работать всегда, используйте флажок вроде messenger:consume --limit=10, чтобы указать работнику, что он должен обработать только 10 сообщений до прекращения работы (затем Супервизор создаст новый процесс). Также есть другие опции вроде --memory-limit=128M и --time-limit=3600.
Перезагружайте работников при запуске
Каждый раз при запуске вам нужно будет перезагрузить все процессы ваших работников, чтобы они видели новозапущенный код. Чтобы сделать это, выполните messenger:stop-workers при запуске. Это сигнализирует каждому работнику, что он должен закончить обработку текущего сообщения и грациозно завершить работу. Затем, Супервизор создаст новые процессы работников. Команда использует кеш app внутренне - поэтому убедитесь в том, что он сконфигурирован для использования адаптера по вашему вкусу.
Используйте один кеш между запусками
Если ваша стратегия запуска заключается в создании новых целевых каталогов, вам нужно установить значение опции конфигурации cache.prefix.seed, чтобы использовать одно и то же пространство имен кеша между запусками. Иначе, пул cache.app будет использовать значение параметра kernel.project_dir в качестве базы для пространства имен, что приведет к разным пространствам имен каждый раз при запуске.

Приоритизированный транспорт

Иногда определенные типы сообщений должны иметь более высокий приоритет и быть обработаны до других. Чтобы сделать это возможным, вы можете создать несколько транспортов и маршрутизировать разные сообщения к ним. Например:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    # queue_name соответствует транспорту doctrine
                    queue_name: high

                    # для AMQP отправьте отдельный обмен, а затем поставьте в очередь
                    #exchange:
                    #    name: high
                    #queues:
                    #    messages_high: ~
                    # or redis try "group"
            async_priority_low:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: low

        routing:
            'App\Message\SmsNotification':  async_priority_low
            'App\Message\NewUserWelcomeEmail':  async_priority_high

Затем вы можете запускать отдельных работников для каждого транспорта, или инструктировать одного работника, чтобы он обрабатывал сообщения в порядке приоритетности:

1
$ php bin/console messenger:consume async_priority_high async_priority_low

Работник будет всегда вначале искать сообщения, ожидающие async_priority_high. Если таких нет, затем он будет потреблять сообщения из async_priority_low.

Ограничьте потребление для конкретных очередей

Некоторый транспорт (особенно AMQP) имеет концепцию обмена и очередей. Транспорт Symfony всегда связан с обменом. По умолчанию, работник потребляет из всех очередей, соединенных с обменом указанного транспорта. Однако, есть примеры использования, когда вам нужно, чтобы работник потреблял только из конкретной очереди.

You can limit the worker to only process messages from specific queues:

1
$ php bin/console messenger:consume my_transport --queues=fasttrack

Чтобы позволить использование опции queues, получатель должен реализовывать QueueReceiverInterface.

5.3

Ограничение работника для конкретных очередей было представлено в Symfony 5.3.

Конфигурация супервизора

Супервизор - это отличный инструмент для гарантии того, что процесс(ы) ваших работников всегда производятся (даже если он закрывается в связи с ошибкой, достижением лимита сообщений или благодаря messenger:stop-workers). Вы можете установить его на Ubuntu, к примеру, через:

1
$ sudo apt-get install supervisor

Файлы конфигурации Супервизора обычно живут в каталоге /etc/supervisor/conf.d. Например, вы можете создать там новый файл messenger-worker.conf, чтобы убедиться, что 2 экземпляра messenger:consume работают всегда:

1
2
3
4
5
6
7
8
9
;/etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
user=ubuntu
numprocs=2
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d

Измените аргумент async, чтобы он использовал название вашего транспорта (или транспортов) и user на Unix-пользователя на вашем сервере.

Если вы используете транспорт Redis, заметьте, что каждому работнику нужно уникальное имя потребителя, чтобы избежать обработки одного сообщения несколькими работникам. Один из способов достичь этого - установить переменную окружения в файле конфигурации Супервизора, на которую вы потом можете сослаться в messenger.yaml (см. раздел Redis выше):

1
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d

Next, tell Supervisor to read your config and start your workers:

1
2
3
4
5
$ sudo supervisorctl reread

$ sudo supervisorctl update

$ sudo supervisorctl start messenger-consume:*

См. документацию Супервизора, чтобы узнать больше деталей.

Грациозное выключение

Если вы установили в своем проекте PHP-расширение PCNTL, работники будут обрабатывать POSIX сигнал SIGTERM для окончания обработки текущего сообщения перед выходом.

В некоторых случаях, сигнал SIGTERM отправляется самим Супервизором (например, остановка контейнера Docker в котором Супервизор - точка входа). В таких случаях, вам нужно добавить ключ stopwaitsecs к конфигурации программы (со значением желаемого периода острочки в секундах) для того, чтобы выполнить грациозное выключение:

1
2
[program:x]
stopwaitsecs=20

Работник без состояния

PHP создан быть без состояния, разные запросы не имеют общих источников. В HTTP контексте PHP очищает все перед отправкой ответа, поэтому вы можете решить не заботиться о сервисах, которые могут допускать утечку памяти.

С другой стороны, работники обычно работают в долгосрочных CLI-процессах, которые не заканчиваются после обработки сообщения. Поэтому вам нужно быть осторожными с состояниями сервисов, чтобы они не давали утечку информации и/или памяти из одного сообщения в другое.

Однако, некоторые сервисы Symfony, вроде обработчика fingers crossed Monolog, допускают утечку по своей задумке. В таких случаях, используйте опцию транспорта reset_on_message, чтобы автоматически сбросить сервис-контейнер между двумя сообщениями:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
# config/packages/messenger.yaml
framework:
    messenger:
        reset_on_message: true
        transports:
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'

5.4

Опция reset_on_message была представлена в Symfony 5.4.

Повторные попытки и ошибки

Если во время потребления сообщения из транспорта будет вызвано исключение, оно будет автоматически повторно отправлено транспорту, чтобы попробовать снова. По умолчанию, сообщение имеет 3 попытки перед сбросом или отправкой транспорту ошибок. Каждая повторная попытка будет тажке отложена во времени, на случай, если она была вызвана временной проблемой. Все это можно сконфигурировать для каждого транспорта:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'

                # конфигурация по умолчанию
                retry_strategy:
                    max_retries: 3
                    # задержка в милисекундах
                    delay: 1000
                    # делает так, чтобы задержка была дольше перед каждой повторной попыткой
                    # например, задержка в 1 секунду, 2 секунду, 4 секунду
                    multiplier: 2
                    max_delay: 0
                    # переопределить это все сервисом, который
                    # реализует Symfony\Component\Messenger\Retry\RetryStrategyInterface
                    # service: null

Tip

Symfony запускает WorkerMessageRetriedEvent, когда сообщение имеет повторные попытки, поэтому вы можете выполнить собственную логику.

5.2

Класс WorkerMessageRetriedEvent был представлен в Symfony 5.2.

Избегание повторных попыток

Иногда обработка сообщения может быть неудачной, и вы будете знать, что это перманентно и повторных попыток не нужно. Если вы вызовете UnrecoverableMessageHandlingException, сообщение не будет иметь повторных попыток.

Форсирование повторных попыток

5.1

RecoverableMessageHandlingException был представлен в Symfony 5.1.

Иногда обработка сообщения может быть неудачной, и вы будете знать, что это временно и необходима повторная попытка. Если вы вызовете RecoverableMessageHandlingException, сообщение всегда будет иметь повторную попытку.

Сохранения и повторные попытки неудачных сообщений

Если сообщение терпит неудачу и имеет несколько повторных попыток (max_retries), оно затем будет отбраковано. Чтобы избежать этого, вы можете сконфигурировать failure_transport:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        # после повторных попыток, сообщения будут отправлены транспорту "failed"
        failure_transport: failed

        transports:
            # ... другой транспорт

            failed: 'doctrine://default?queue_name=failed'

В этом примере, если обработка сообщения терпит неудачу 3 раза (значение по умолчанию max_retries), оно затем будет отправлено транспорту failed. Хотя вы можете использовать messenger:consume failed для потребления его, как обычного транспорта, вам скорее захочется вручную просмотреть сообщения в транспорте ошибок и решить об их повторных попытках:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# увидеть все сообщения в транспорте ошибок
$ php bin/console messenger:failed:show

# увидеть детали конкретной ошибки
$ php bin/console messenger:failed:show 20 -vv

# увидеть и повторно попробовать каждое сообщение по-отдельности
$ php bin/console messenger:failed:retry -vv

# повторная попытка конкретных сообщений
$ php bin/console messenger:failed:retry 20 30 --force

# удалить сообщение без повторной попытки
$ php bin/console messenger:failed:remove 20

# удалить сообщения без повторных попыток и показать каждое сообщение перед удалением
$ php bin/console messenger:failed:remove 20 30 --show-messages

5.1

Опция --show-messages была представлена в Symfony 5.1.

Если сообщение опять потерпит неудачу, оно будет отправлено обратно в транспорт ошибок в соответствии с обычными правилами повторных попыток. Как только будет достигнут максимум повторных попыток, сообщение будет сброшено перманентно.

Несколько ошибочных транспортов

5.3

Возможность использовать несколько транспортов ошибок была представлена в Symfony 5.3.

Иногда недостаточно иметь один глобальный сконфигурированный failed transport, котому что некоторые сообщения важнее, чем другие. В таких случаях, вы можете переопределить транспорт ошибок только для определенных транспортов:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# config/packages/messenger.yaml
framework:
    messenger:
        # после повторных попыток, сообщения будут отправлены транспорту "failed"
        # по умолчанию, если внутри транспорта не сконфигурирован "failed_transport"
        failure_transport: failed_default

        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                failure_transport: failed_high_priority

            # так как транспорт ошибок не сконфигурирован, будет использоваться установленный
            # глобальный набор "failure_transport"
            async_priority_low:
                dsn: 'doctrine://default?queue_name=async_priority_low'

            failed_default: 'doctrine://default?queue_name=failed_default'
            failed_high_priority: 'doctrine://default?queue_name=failed_high_priority'

Если нету определенного failure_transport глобально или на уровне транспорта, сообщение будет сброшено после определенного количества попыток.

Неудачные команды имеют необязательную опцию --transport, чтобы указать failure_transport, сконфигурированный на уровне транспорта.

1
2
3
4
5
6
7
8
# увидеть все сообщения в транспорте "failure_transport"
$ php bin/console messenger:failed:show --transport=failure_transport

# повторная попытка конкретных сообщений из "failure_transport"
$ php bin/console messenger:failed:retry 20 30 --transport=failure_transport --force

# удалить сообщение без повторной попытки из "failure_transport"
$ php bin/console messenger:failed:remove 20 --transport=failure_transport

Конфигурация транспорта

Мессенджер поддерживает несколько разных типов транспорта, каждый со своими опциями. Опции могут быть переданы транспорту через строку DSN или конфигурацию.

1
2
# .env
MESSENGER_TRANSPORT_DSN=amqp://localhost/%2f/messages?auto_setup=false
  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            my_transport:
                dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
                options:
                    auto_setup: false

Опции, определенные под options, главенствуют над определенными в DSN.

Транспорт AMQP

Транспорт AMQP использует PHP-расширение AMQP для отправки сообщений в очередь вроде RabbitMQ.

5.1

Начиная с Symfony 5.1, транспорта AMQP переехал в отдельный пакет. Установите его, выполнив:

1
$ composer require symfony/amqp-messenger

DSN транспорта AMQP может выглядеть так:

1
2
3
4
5
# .env
MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages

# или используйте протокол AMQPS
MESSENGER_TRANSPORT_DSN=amqps://guest:guest@localhost/%2f/messages

5.2

Поддержка протокола AMQPS была представлена в Symfony 5.2.

Если вы хотите использовать AMQP, зашифрованный с помощью TLS/SSL, вы должны также предоставить CA-сертификат. Определите путь сертификата в настройке PHP.ini amqp.cacert (например, amqp.cacert = /etc/ssl/certs) или в параметре DSN cacert (например, amqps://localhost?cacert=/etc/ssl/certs/).

Порт, по умолчанию используемый AMQP, зашифрованным с помощью TLS/SSL, - 5671, но вы можете переопределить его в параметре DSN port (например, amqps://localhost?cacert=/etc/ssl/certs/&port=12345).

Note

По умолчанию, транспорт будет автоматически создавать любые необходимые обмены, очереди и связующие ключи. Это можно отключить, но некоторые функции могут работать некорректно (вроде отложенных очередей).

Note

С Symfony 5.3 или новее, вы можете ограничить потребителей транспорта AMQP, чтобы они обрабатывали только сообщения из некоторых очередей какого-то обмена. См. Мессенджер: работа с синхронизированными сообщениями и сообщениями в очереди.

Транспорт имеет другие опции, включая способы конфигурации обмена, связующие ключи очереди и т.д. Смотрите документацию Connection.

Транспорт имеет ряд опций:

????? ???????? ?? ?????????
auto_setup ?????? ?? ??????? ???? ??????? ????????????? ?? ????? ???????? / ?????????. true
cacert ???? ? ????? CA-??????????? ? PEM-???????.  
cert ???? ? ??????????? ??????? ? PEM-???????.  
channel_max ????????? ?????????? ????? ???????, ??????? ????????? ??????. 0 ???????? ?????????? ????? ??????????.  
confirm_timeout ????-??? ??? ????????????? ? ????????; ???? ?? ??????, ????????? ?? ????? ??????? ????????????? ?????????, ??????????: 0 ??? ?????? ??????. ????? ???? ???????.  
connect_timeout ????-??? ??????????. ??????????: 0 ??? ?????? ??????. ????? ???? ???????.  
frame_max ?????????? ?????? ??????, ??????? ?????????? ?????? ??? ??????????, ??????? ????????? ?????? ? ???-????. 0 ???????? ??????????? ????? ?????????? (??????? ?? ?????? ??????? ?????? librabbimq ?? ?????????)  
heartbeat ???????? ????????? ?????????? (? ????????), ??????? ????? ??????. 0 ????????, ??? ?????? ?? ????? ?????????. ????????, ??? librabbimq ????? ???????????? ????????? ?????????, ??? ????????, ??? ????????? ??????????? ?????? ?? ????? ?????????? ???????.  
host ??? ????? AMQP-???????  
key ????? ? ????? ??????? ? PEM-???????.  
password Password to use to connect to the AMQP service  
persistent   'false'
port ???? AMQP-???????  
prefetch_count    
read_timeout ????-??? ???????? ??????????. Note: ??????????: 0 ??? ?????? ??????. ????? ???? ???????.  
retry    
sasl_method    
user ??? ???????????? ??? ?????????? ? AMQP-????????  
verify ???????? ??? ????????? ??????????? ?????. ???? ??????????? ????????, ?? ????? ??? ? ??????????? ??????? ?????? ????????? ? ?????? ???????. ??????????? ???????? ?? ?????????.  
vhost ??????????? ???? ??? ????????????? ? AMQP-????????  
write_timeout ????-??? ????????? ??????????. ??????????: 0 ??? ?????? ??????. ????? ???? ???????.  
delay[queue_name_pattern] ???????, ???????????? ??? ???????? ???????? delay_%exchange_name%_%routing_key%_%delay%
delay[exchange_name] ??? ??????, ???????????? ??? ??????????/????????? ????????? delays
queues[name][arguments] ?????????????? ?????????  
queues[name][binding_arguments] ?????????, ???????????? ??? ?????????? ???????.  
queues[name][binding_keys] ????????? ????? (???? ????) ??? ?????????? ? ????????  
queues[name][flags] ?????? ??????? AMQP_DURABLE
exchange[arguments] ?????????????? ????????? ??? ?????? (????????,alternate-exchange)  
exchange[default_publish_routing_key] ???? ?????????????, ???????????? ??? ??????????, ???? ?? ?? ?????? ? ?????????  
exchange[flags] ?????? ?????? AMQP_DURABLE
exchange[name] ???????? ??????  
exchange[type] ??? ?????? fanout

5.2

Опция confirm_timeout была представлена в Symfony 5.2.

5.3

Опция prefetch_count устарела в Symfony 5.3 так как она не имеет эффекта в транспорте AMQP мессенджера.

Вы можете также сконфигурирвать настройки специально для AMQP в вашем оообщении, добавив AmqpStamp к вашему Конверту:

1
2
3
4
5
6
7
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
// ...

$attributes = [];
$bus->dispatch(new SmsNotification(), [
    new AmqpStamp('custom-routing-key', AMQP_NOPARAM, $attributes),
]);

Caution

Потребители не отображаются в панели админа, так как этот транспорт не полагается на \AmqpQueue::consume(), который блокирует. Наличие блокирующего получателя делает опции --time-limit/--memory-limit команды messenger:consume. а также команды messenger:stop-workers бесполезными, так как они полагаются на тот факт, что получатель возвращается незамедлительно, независимо от того, находит он сообщение или нет. Работник потребления отвечает за итерацию до получения сообщения для обработки и/или до того, как будет достигнуто одно из условий остановки. Таким образом, логика остановки работника может быть достигнута, если он застрял на блокирующем вызове.

Транспорт Doctrine

Транспорт Doctrine может быть использован для хранения сообщений в таблице базы данных.

5.1

Начиная с Symfony 5.1, транспорт Doctrine был перемещен в отдельный пакет. Установите его, запустив:

1
$ composer require symfony/doctrine-messenger

The Doctrine transport DSN may looks like this:

1
2
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default

Формат doctrine://<connection_name>, в случае если у вас есть несколько соединений и вы хотите использовать какое-либо другое, чем "default". Транспорт будет автоматически создавать таблицу под названием messenger_messages.

5.1

Возможность автоматически генерировать миграцию для таблицы messenger_messages была вредставлена в Symfony 5.1 и DoctrineBundle 2.1.

Или, для создания таблицы самостоятельно, установите опцию auto_setup как false, и сгенерируйте миграцию.

Caution

Свойство datetime сообщений, хранящееся в базе данных, использует временную зону текущей системы. Это может вызвать проблемы, если несколько машин с разными конфигурациями временных зон используют одно хранилище.

Транспорт имеет такие опции:

????? ???????? ?? ?????????
table_name ???????? ??????? messenger_messages
queue_name ???????? ??????? (??????? ? ???????, ????? ???????????? ???? ??????? ??? ?????????? ???????????) default
redeliver_timeout ????-??? ????? ????????? ???????? ????????? ? ???????, ?? ?? ? ????????? "?????????" (???? ???????? ?? ?????-?? ??????? ???????????, ?????????? ???, ? ????? ?? ?????? ????? ??????????? ?????????) - ? ????????. 3600
auto_setup ?????? ?? ??????? ???? ??????? ????????????? ?? ????? ????????/?????????. true

5.1

Возможность использовать PostgreSQL LISTEN/NOTIFY была представлена в Symfony 5.1.

При использовании PostgreSQL, у вас есть доступ к следующим опциям для получения преимуществ функции LISTEN/NOTIFY. Это позволяет более производительный подход, чем поведение голосования транспорта Doctrine по умолчанию, так как PostgreSQL будет напрямую уведомлять работников, когда новое сообщение будет появляться в таблице.

????? ???????? ?? ?????????
use_notify ???????????? ?? LISTEN/NOTIFY. true
check_delayed_interval ???????? ???????? ?????????? ?????????, ? ????????????. ?????????? ??? 0, ????? ????????? ????????. 1000
get_notify_timeout ????????????????? ??????? ???????? ?????? ??? ?????? PDO::pgsqlGetNotify`, ? ????????????. 0

Транспорт Beanstalkd

5.2

Транспорт Beanstalkd был представлен в Symfony 5.2.

Транспорт Beanstalkd отправляет сообщения прямо с рабочую очередь Beanstalkd. Установите его, выполнив:

1
$ composer require symfony/beanstalkd-messenger

DSN транспорта Beanstalkd может выглядеть так:

1
2
3
4
5
# .env
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost:11300?tube_name=foo&timeout=4&ttr=120

# Если порта нет, он по умолчанию будет 11300
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost

Транспорт имеет ряд опций:

????? ???????? ?? ?????????
tube_name ???????? ??????? default
timeout ????-??? ??????? ????????? - ? ????????. 0 (???????? ?????? ????? ?? ???? ??????? ?????, ???? ??????? TransportException)
ttr ????? ?????????? ????????? ????? ??? ??? ????????? ??? ??????? ? ??????? ?????????? - ? ????????. 90

Транспорт Redis

Транспорт Redis использует потоки для создания очереди сообщений. Этот транспорт требует PHP-расширения Redis (>=4.3) и работающего сервера Redis (^5.0).

5.1

Начиная с Symfony 5.1, транспорт Redis был перемешен в отдельный пакет. Установите его, выполнив:

1
$ composer require symfony/redis-messenger

DSN транспорта Redis может выглядеть так:

1
2
3
4
5
6
7
8
# .env
MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
# Полный пример DSN
MESSENGER_TRANSPORT_DSN=redis://password@localhost:6379/messages/symfony/consumer?auto_setup=true&serializer=1&stream_max_entries=0&dbindex=0&delete_after_ack=true
# Пример кластера Redis
MESSENGER_TRANSPORT_DSN=redis://host-01:6379,redis://host-02:6379,redis://host-03:6379,redis://host-04:6379
# Пример Unix-сокета
MESSENGER_TRANSPORT_DSN=redis:///var/run/redis.sock

5.1

DSN Unix-соекта была представлена в Symfony 5.1.

Некоторые опции могут быть сконфигурированы через DSN или ключ options под транспортом в messenger.yaml:

????? ???????? ?? ?????????
stream ???????? ?????? Redis messages
group ???????? ?????? ???????????? Redis symfony
consumer ???????? ???????????, ????????????? ? Redis consumer
auto_setup ??????? ?????? Redis ?????????????? true
auth ?????? Redis  
delete_after_ack ???? true, ????????? ????????????? ????????????? ????? ???????? false
delete_after_reject ???? true, ????????? ????????????? ?????????????, ???? ??? ??????????? true
lazy ??????????? ?????? ???? ?????????? ????????????? ?????????? false
serializer ??? ????????????? ????????? ???????? ? Redis (????? Redis::OPT_SERIALIZER ) Redis::SERIALIZER_PHP
stream_max_entries ???????????? ?????????? ???????, ?? ???????? ????? ?????? ?????. ?????????? ?????????? ??????? ????????, ????? ???????? ?????? "???????????" ????????? 0 (??? ???????? "?? ???????")
tls ???????? TLS-????????? ?????????? false
redeliver_timeout ????-??? ????? ????????? ???????? "????????????" ?????????, ??????? ??????????? ???????????? ??????????? (???? ???????? ?? ?????-?? ??????? ????, ?????????? ???, ? ? ????? ??? ????? ????? ?????????? ????????? ??????? ?????????) - ? ????????. 3600
claim_interval ???????? ? ??????? ????? ????????? "???????????"/??????????? ????????? - ? ????????????? 60000 (1 ??????)
sentinel_persistent_id ??????, ???? ?????????? ????????? null
sentinel_retry_interval ??????????, ???????? ? ????????????? 0
sentinel_read_timeout Float, ???????? ? ????????, ?? ????????? ?? ?????????? 0
sentinel_timeout Float, ???????? ? ????????, ?? ????????? ?? ?????????? 0
sentinel_master ??????, ???? null ??? ??????, ??????????? ????????? Sentinel null

Caution

Никогда не должно быть более одной команды messenger:consume выполняемой с одинаковой комбинацией stream, group и consumer, иначе сообщения могут быть обработаны более, чем один раз. Если вы запускаете несколько работников очерели, consumer может быть установлен как переменная окружения (вроде %env(MESSENGER_CONSUMER_NAME)%), установленная Супервизором (пример ниже) или любым другим сервисом, используемым для управления процессами работников. В окружении контейнера, HOSTNAME может быть использовано как имя потребителя, так как там только один работник на контейнер/хост. Если вы используете Kubernetes для управления контейнерами, рассмотрите использование StatefulSet для стабилизации имен.

Tip

Установите delete_after_ack как true (если у вас одна группа) или определите stream_max_entries (если вы можете предположить, какое максимальное количество записей допустимо в вашем случае), чтобы избежать утечек памяти. В другом случае. все сообщения навсегда останутся в Redis.

5.1

Опции delete_after_ack, redeliver_timeout и claim_interval были представлены в Symfony 5.1.

5.2

Опции delete_after_reject и lazy были представлены в Symfony 5.2.

5.4

Опции sentinel_persistent_id, sentinel_retry_interval, sentinel_read_timeout, sentinel_timeout, и sentinel_master были представлены в Symfony 5.4.

5.4

Отсутствие установки ясного значения для опции delete_after_ack не поощряется с версии Symfony 5.4. В Symfony 6.0, значение этой опции по умолчанию изменяется с false на true.

Транспорт в памяти

Транспорт in-memory на самом деле не доставляет сообщения. Вместо этого, он дедржит их в памяти во время запроса, что может быть полезным для тестирования. Например, если у вас есть транспорт async_priority_normal, вы можете переопределить его в окружении test, чтобы использовать этот транспорт:

  • YAML
  • XML
  • PHP
1
2
3
4
5
# config/packages/test/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_normal: 'in-memory://'

Тогда, во время тестирования, сообщения не будут отправлены реальному транспорту. Даже лучше, в тесте, вы можете проверить, чтобы только одно сообщение было отправлено во время запроса:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// tests/Controller/DefaultControllerTest.php
namespace App\Tests\Controller;

use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
use Symfony\Component\Messenger\Transport\InMemoryTransport;

class DefaultControllerTest extends WebTestCase
{
    public function testSomething()
    {
        $client = static::createClient();
        // ...

        $this->assertSame(200, $client->getResponse()->getStatusCode());

        /* @var InMemoryTransport $transport */
        $transport = self::$container->get('messenger.transport.async_priority_normal');
        $this->assertCount(1, $transport->getSent());
    }
}

Транспорт имеет ряд опций:

serialize (булево, по умолчанию: false)
Сериализовать сообщения или нет. Это полезно для тестирования дополнительного слоя, особенно когда вы используете собственный сериализатор сообщений.

5.3

Опция serialize была представлена в Symfony 5.3.

Note

Все транспорты in-memory будут автоматически сброшены после каждого теста в классах тестов, расширяющих KernelTestCase или WebTestCase.

Amazon SQS

5.1

Транспорт Amazon SQS был представлен в Symfony 5.1.

Транспорт Amazon SQS прекрасно подходит для приложения на AWS. Установите его, выполнив:

1
$ composer require symfony/amazon-sqs-messenger

DSN SQS транспорта выглядит так:

1
2
3
# .env
MESSENGER_TRANSPORT_DSN=https://sqs.eu-west-3.amazonaws.com/123456789012/messages?access_key=AKIAIOSFODNN7EXAMPLE&secret_key=j17M97ffSVoKI0briFoo9a
MESSENGER_TRANSPORT_DSN=sqs://localhost:9494/messages?sslmode=disable

Note

Транспорт автоматически создаст необходимые очерди. Это можно отключить, установив опцию auto_setup как false.

Tip

До отправки или получения сообщения, Symfony необходимо конвертировать название очереди в URL очереди AWS вызвав API GetQueueUrl в AWS. Этого дополнительного API-вызова можно избежать, предоставив DSN, которая является URL очереди.

5.2

Функция предоставления URL очереди в DSN была представлена в Symfony 5.2.

Транспорт имеет ряд опций:

????? ???????? ?? ?????????
access_key ???? ??????? AWS  
account ????????????? AWS-???????? ???????? ??????? ??????
auto_setup ????? ?? ????????????? ???????? ??????? ?? ????? ????????/?????????. true
buffer_size ?????????? ????????? ??? ???????????????? ?????????? 9
debug ???? true. ????? ??? ???? HTTP ???????? ? ??????? (??? ?????? ?? ??????????????????) false
endpoint ?????????? URL ? SQS-??????? https://sqs.eu-west-1.amazonaws.com
poll_timeout ????? ???????? ?????? ????????? ? ???????? 0.1
queue_name ???????? ??????? messages
region ???????? AWS-??????? eu-west-1
secret_key ????????? ???? AWS  
visibility_timeout ?????????? ??????, ?? ?????????? ??????? ????????? ?? ????? ??????? (Visibility Timeout) ???????????? ???????
wait_time ???????????? Long polling ? ???????? 20

5.3

Опция debug была представлена в Symfony 5.3.

Note

Параметр wait_time определяет максимальное время ожидания для Amazon SQS до того, как сообщение будет доступно в очереди, перед отправкой ответа. Это помогает снизить стоимость использования Amazon SQS, устранив некоторое количество пустых ответов.

Параметр poll_timeout определяет время ожидания получателя до возвращения null. Он избегает блокировки других получателей от вызова.

Note

Если название очереди имеет суффикс .fifo, AWS создаст очередь FIFO. Используйте марку AmazonSqsFifoStamp, чтобы определить Message group ID и Message deduplication ID.

Очереди FIFO не поддерживают установки задержки отдельных сообдений, значение delay: 0 требуется в настройках стратегии повторных попыток.

Сериализация сообщений

Когда сообщения отправляются (и получаются) в транспорт, они сериализуются с использование нативных функций PHP serialize() и unserialize(). Вы можете изменить это глобально (или для каждого транспорта) на сервис, реализующий SerializerInterface:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
12
13
# config/packages/messenger.yaml
framework:
    messenger:
        serializer:
            default_serializer: messenger.transport.symfony_serializer
            symfony_serializer:
                format: json
                context: { }

        transports:
            async_priority_normal:
                dsn: # ...
                serializer: messenger.transport.symfony_serializer

messenger.transport.symfony_serializer - это встроенный сервис, который использует компонент Сериализатор и может быть сконфигурирован несколькими способами. Если вы выберете использовать сериализатор Symfony, вы сможете контролировать контекст для каждого случая отдельно через SerializerStamp (см. Конверты и марки).

Tip

При отправке/получении сообщений в/из другого транспорта, вам может понадобиться больше контроля над процессом сериализации. Использование пользовательского сериализатора предоставляет такой контроль. См. Туториал по сериализации сообщений SymfonyCasts, чтобы узнать больше.

Настройка обработчиков

Manually Configuring Handlers

Symfony обычно будет находить и регистрировать вашего обработчика автоматически. Но вы также можете сконфигурировать его вручную - и передать ему дополнительную конфигурацию - тегировав сервис обработчика messenger.message_handler

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
# config/services.yaml
services:
    App\MessageHandler\SmsNotificationHandler:
        tags: [messenger.message_handler]

        # или сконфигурировать с опциями
        tags:
            -
                name: messenger.message_handler
                # необходимо только если невозможно угадать по подсказке
                handles: App\Message\SmsNotification

Возможные опции конфигурации с тегами:

  • bus
  • from_transport
  • handles
  • method
  • priority

Подписчик и опции обработчика

Класс обработчика может обрабатывать множество сообщений или конфигурировать сам себя, реализуя MessageSubscriberInterface:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;

class SmsNotificationHandler implements MessageSubscriberInterface
{
    public function __invoke(SmsNotification $message)
    {
        // ...
    }

    public function handleOtherSmsNotification(OtherSmsNotification $message)
    {
        // ...
    }

    public static function getHandledMessages(): iterable
    {
        // обработать это сообщение в __invoke
        yield SmsNotification::class;

        // также обработать это сообщение в handleOtherSmsNotification
        yield OtherSmsNotification::class => [
            'method' => 'handleOtherSmsNotification',
            //'priority' => 0,
            //'bus' => 'messenger.bus.default',
        ];
    }
}

Связывание обработчиков с разными транспортами

Каждое сообщение может иметь несколько обработчиков, и когда сообщение потребляется, вызываются все его обработчики. Но вы можете также сконфигурироать обработчика так, чтобы он вызывался только когда сообщение получено из конкретного транспорта. Это позволяет вам имет одно сообщение, где каждый обработчик вызывается разными "работниками", потребляющими разный транспорт.

Представьте, что у вас есть сообщение UploadedImage с двумя обработчиками:

  • ThumbnailUploadedImageHandler: вы хотите, чтобы это обрабатывалось транспортом под названием image_transport
  • NotifyAboutNewUploadedImageHandler: вы хотите, чтобы это обрабатывалось транспортом под названием async_priority_normal

Чтобы сделать это, добавьте опцию from_transport к каждому обработчику. Например:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// src/MessageHandler/ThumbnailUploadedImageHandler.php
namespace App\MessageHandler;

use App\Message\UploadedImage;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;

class ThumbnailUploadedImageHandler implements MessageSubscriberInterface
{
    public function __invoke(UploadedImage $uploadedImage)
    {
        // создать миниатюры
    }

    public static function getHandledMessages(): iterable
    {
        yield UploadedImage::class => [
            'from_transport' => 'image_transport',
        ];
    }
}

И, похожим образом:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// src/MessageHandler/NotifyAboutNewUploadedImageHandler.php
// ...

class NotifyAboutNewUploadedImageHandler implements MessageSubscriberInterface
{
    // ...

    public static function getHandledMessages(): iterable
    {
        yield UploadedImage::class => [
            'from_transport' => 'async_priority_normal',
        ];
    }
}

Затем, убедитесь, что "маршрутизируете" ваше сообщение к обоим транспортам:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_normal: # ...
            image_transport: # ...

        routing:
            # ...
            'App\Message\UploadedImage': [image_transport, async_priority_normal]

Вот и все! Теперь вы можете потреблять каждый транспорт:

1
2
3
4
# вызовет ThumbnailUploadedImageHandler только при обработке сообщения
$ php bin/console messenger:consume image_transport -vv

$ php bin/console messenger:consume async_priority_normal -vv

Caution

Если обработчик не имеет конфигурации from_transport, он будет выполнен в каждом транспорте, из которого будет получено это сообщение.

Расширение мессенлжера

Конверты и марки

Сообщение может быть любым PHP-объектом. Иногда вам может понадобиться сконфигурировать что-то дополнительное в сообщении - вроде того, как оно должно быть обработано внутри AMQP или добавления задержки перед обработкой сообщения. Вы можете сделать это, добавив марку ("stamp") к вашему сообщению:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;

public function index(MessageBusInterface $bus)
{
    $bus->dispatch(new SmsNotification('...'), [
        // подождать 5 секунд перед обработкой
        new DelayStamp(5000),
    ]);

    // или ясно создайте Конверт
    $bus->dispatch(new Envelope(new SmsNotification('...'), [
        new DelayStamp(5000),
    ]));

    // ...
}

Внутренне, каждое сообщение оборачивается в конверт (Envelope), содержащий сообщение и марки. Вы можете создать его вручную или позволить автобусу сообщений сделать это. Существует множество различных марок для разных целей и они используются внутренне для отслеживания информации о сообщении - вроде того, какой автобус обрабатывает его или имеет ли оно повторные попытки после неудачи.

Промежуточное ПО

То, что происходит после запуска сообщения в автобус сообщений, зависит от его набора промежуточного ПО и его порядка. По умолчанию, промежуточное ПО, сконфигурированное для каждого автобуса, выглядит так:

  1. add_bus_name_stamp_middleware - добавляет марку для записи того, в каком атобусе было запущено это собщение;
  2. dispatch_after_current_bus- см. Транзакционные сообщение: обрабатывайте сообщения после того, как обработка закончена;
  3. failed_message_processing_middleware - обрабатывает сообщения, которые имеют повторные попытки через транспорт ошибок, чтобы они правильно функционировали, как будто бы они были получены из изначального транспорта;
  4. Ваша собственная коллекция middleware;
  5. send_message - если машрутизация сконфигурирована для транспорта, отправляет сообщения этому транспорту и останавливает цепь промежуточного ПО;
  6. handle_message - вызывает обработчика(ов) сообщений для заданног сообщения.

Note

Эти названия промежуточного ПО - на самом деле сокращения. Настоящие id сервисов имеют префикс messenger.middleware. (например,messenger.middleware.handle_message).

Промежуточное ПО выполняется после запуска сообщения, и также еще раз, когда сообщение получено через работника (для сообщений, которые были отправлены транспорту для асинхронной обработки). Помните это, если вы создаете собственное промежуточное ПО.

Вы можете добавить собственное промежуточное ПО в список, или полностью отключить промежуточное ПО по умолчанию и добавить только ваше собственное:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
12
13
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            messenger.bus.default:
                # отключить промежуточное ПО по умолчанию
                default_middleware: false

                # и/или добавить ваше собственное
                middleware:
                    # id серисов, релизующих Symfony\Component\Messenger\Middleware\MiddlewareInterface
                    - 'App\Middleware\MyMiddleware'
                    - 'App\Middleware\AnotherMiddleware'

Note

Если сервис промежуточного ПО абстрактный, будет создан другой экземпляр сервиса для каждого автобуса.

Промежуточное ПО для Doctrine

1.11

Следующее промежуточное по для Doctrine было представлено в DoctrineBundle 1.11.

Если вы в своем приложении используете Doctrine, существует ряд необязательного промежуточного ПО, которое вы можете захотеть использовать:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            command_bus:
                middleware:
                    # каждый раз при обработке сообщения, соединение Doctrine
                    # "пингуется" и повторно подключается, если оно закрыто. Полезно,
                    # если ваши работники работают долгое время и соединение базы
                    # данных иногда теряется
                    - doctrine_ping_connection

                    # После обработки, соединение Doctrine закрывается, что может
                    # освободить соединения базы данных в работнике, вместо того,
                    # чтобы держать их открытыми всегда
                    - doctrine_close_connection

                    # оборачивает всех обработчиков в одну транзакцию Doctrine
                    # обработчикам не надо вызывать flush(), а ошибка в любом
                    # обработчике вызовет откат
                    - doctrine_transaction

                    # or pass a different entity manager to any
                    #- doctrine_transaction: ['custom']

Другое промежуточное ПО

5.3

Промежуточное ПО router_context было представлено в Symfony 5.3.

Добавьте промежуточное ПО router_context, если вам нужно генерировать абсолютные URL в потребителе (например, отображать шаблон со ссылками). Это промежуточное ПО хранит контекст изначального запроса (т.е. хост, HTTP-порт и т.д.), что необходимо при создании абсолютных URL.

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            command_bus:
                middleware:
                    - router_context

События Мессенджера

В дополнение к промежуточному ПО, Мессенджер также запускает несколько событий. Вы можете создать слушателя событий, чтобы подключаться к разным частям процесса. Для каждого, класс события будет названием события:

Несколько автобусов, автобусов команд и событий

Мессенджер предоставляет вам один сервис автобуса сообщений по умолчанию. Но вы можете сконфигурировать столько, сколько вы хотите, создав автобусы "команд", "запросов" или "событий" и контролируя их промежуточное ПО. См. Несколько автобусов.