Компонент Messenger

Дата обновления перевода: 2023-01-19

Компонент Messenger

Компонент Messenger помогает приложениям отправлять и принимать сообщения из других приложений или через очереди сообщений.

Компонент во многом был основан на серии постов Маттиаса Нобака о командах bus и проект SimpleBus.

See also

Эта статья объясняет как использовать функции Messenger в качестве независимого компонента в любом PHP-приложении. Прочтите статью Messenger: работа с синхронизированными сообщениями и сообщениями в очереди, чтобы узнать, как его использовать в приложениях Symfony.

Установка

1
$ composer require symfony/messenger

Как вариант, вы можете клонировать хранилище https://github.com/symfony/messenger.

Note

Если вы устанавливаете этот компонент вне приложения Symfony, вам нужно подключить файл vendor/autoload.php в вашем коде для включения механизма автозагрузки классов, предоставляемых Composer. Детальнее читайте в этой статье.

Концепты

Отправитель:
Отвечает за сериализацию и отправку сообщений чему-то. Это что-то может быть брокером сообщений или сторонней API, к примеру.
Получатель:
Отвечает за десериализацию и перенаправление сообщений обработчку(ам). Это может быть пулер очереди сообщений или конечная точка API, к примеру.
Обработчик:
Отвечает за обработку сообщений, используя бизнес-логику, применимую к сообщениям. Обработчики вызываются промежуточным ПО HandleMessageMiddleware.
Промежуточное ПО:
Промежуточное ПО может получить доступ к сообщению и его обертке (конверту), во время запуска через bus. В буквальном смысле "ПО посредине", оно не касается основной функциональности (бизнес логики) приложения. Вместо этого, оно срезает углы и может применяться по всему приложению, влияя на весь bus сообщений. К примеру: ведение логов, валидацию сообщения, начало транзакции, ... Оно также ответственно за вызов следующего промежуточного ПО в цепочке, что означает, что оно также может настраивать конверт, добавляя к нему штапмы или даже перемещая его, а также прервать цепочку промежуточного ПО. Промежуточное ПО вызывается и при первоначальном запуске сообщения, и позже, когда сообщение получено от транспорта.
Конверт:
Специальная концепция мессенджера, предоставляющая полную гибкость внутри bus с сообщениями, оборачивая в себя сообщения, и позволяя добавлять полезную информацию внутри посредоством марок на конвертах.
Штампы конвертов:
Информация, которую вам нужно добавить к сообщению: контекст сериализатора, который использовать для транспорта, маркеры, указывающие на получение сообщения, или любые другие метаданные, которые может использовать ваше промежуточное ПО или слой транспорта.

Автобус

Автобус используется для развёртывания сообщений. Поведение автобуса упорядочено в связующем стеке. Компонент поставляется с набором связующего ПО, которое вы можете использовать.

При использовании автобуса сообщений с пакетом Symfony FrameworkBundle, для вас конфигурируется следующее промежуточное ПО:

  1. SendMessageMiddleware (enables asynchronous processing, logs the processing of your messages if you pass a logger)
  2. HandleMessageMiddleware (calls the registered handler(s))

Пример:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
use App\Message\MyMessage;
use App\MessageHandler\MyMessageHandler;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;

$handler = new MyMessageHandler();

$bus = new MessageBus([
    new HandleMessageMiddleware(new HandlersLocator([
        MyMessage::class => [$handler],
    ])),
]);

$bus->dispatch(new MyMessage(/* ... */));

Note

Каждое промежуточное ПО должно реализовывать MiddlewareInterface.

Обработчики

После диспетчеризации в автобус, сообщения будут обработаны "обработчиками сообщений". Обработчик сообщений - это PHP вызываемое (т.е. функция или экземпляр класса), которое будет проводить необходимую обработку для вашего сообщения:

1
2
3
4
5
6
7
8
9
10
11
namespace App\MessageHandler;

use App\Message\MyMessage;

class MyMessageHandler
{
   public function __invoke(MyMessage $message)
   {
       // Обработка сообщения...
   }
}

Добавление метаданных к сообщениям (конверты)

Если вам нужно добавить метаданные или некоторую конфигурацию в сообщение, оберните его в класс Envelope и добавьте штампы. Например, чтобы установить группы сериализации, используемые при прохождении сообщения через слой транспорта, используйте штамп SerializerStamp:

1
2
3
4
5
6
7
8
9
10
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\SerializerStamp;

$bus->dispatch(
    (new Envelope($message))->with(new SerializerStamp([
        // группы применяются ко всему сообщению, так что не забудьте
        // определить группу для каждого встроенного объекта
        'groups' => ['my_serialization_groups'],
    ]))
);

Вот некоторые вашные штампы на конверты, которые отправляются в Symfony Messenger:

  1. DelayStamp, чтобы отложить обработку асинхронного сообщения.
  2. DispatchAfterCurrentBusStamp, чтобы сообщение было обработано после того, как будет выполнен текущий bus. Прочтите больше в Транзакционные сообщение: обрабатывайте сообщения после того, как обработка закончена.
  3. HandledStamp, штамп, который отмечает сообщение обработанным конкретным обработчиком. Позволяет получить доступ к значению, возвращенному обработчиком, и названию обработчка.
  4. ReceivedStamp, внутренний штамп, который отмечает сообщение полученным от транспорта.
  5. SentStamp, штамп, который отмечает сообщение отправленным от конкретного отправителя. Позволяет получить доступ к FQCN отправителя и псевдоним, если он доступен, из SendersLocator.
  6. SerializerStamp, чтобы сконфигурировать группы сериализации, используемые транспортом.
  7. ValidationStamp, чтобы сконфигурировать группы валидации, используемые при включении промежуточного ПО валидации.
  8. ErrorDetailsStamp, внутренний штамп, когда сообщение неуспешно в связи с исключением в обработчике.

Вместо того, чтобы разбираться с сообщениями напрямую в промежуточном ПО, вы получаете конверт. Таким образом, вы можете исследовать содержание конверта и его штампа или добавить их:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
use App\Message\Stamp\AnotherStamp;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;

class MyOwnMiddleware implements MiddlewareInterface
{
    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        if (null !== $envelope->last(ReceivedStamp::class)) {
            // Сообщение было получено только что...

            // Вы можете, к примеру, добавить другой штамп.
            $envelope = $envelope->with(new AnotherStamp(/* ... */));
        } else {
            // Сообщение только что было запущено
        }

        return $stack->next()->handle($envelope, $stack);
    }
}

Пример выше перенаправит сообщение к следующему промежуточному ПО с дополнительным штампом если сообщение было только что получено (т.е. имеет хотя бы один штамп ReceivedStamp). Вы можете создавать собственные штампы, реализуя StampInterface.

Если вы хотите изучить все штампы на конверте, используйте метод $envelope->all(), который возвращает все штампы, сгрупированные по типам (FQCN). Как вариант, вы можете итерировать все штампы конкретного типа, используя FQCN в качестве первого параметра этого метода (например, $envelope->all(ReceivedStamp::class)).

Note

Любой штамп должен иметь возможность быть сериализованым с использованием компонента Symfony Сериализатор, если он проходит через транспорт, используя базовый сериализатор Serializer.

Транспорт

Для отправки и получения сообщений вам понадобится сконфигурировать транспорт. Транспорт будет отвечать за коммуникацию с вашим брокером сообщений или третьими сторонами.

Ваш собственный отправитель

Представьте, что у вас уже есть сообщение ImportantAction, проходящее через автобус сообщений и обрабатываемое обработчиком. Теперь, вы также хотите отправить это сообщение в виде электронного письма (используя компоненты Mime и Mailer).

Используя SenderInterface, вы можете создать собственного отправителя:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
namespace App\MessageSender;

use App\Message\ImportantAction;
use Symfony\Component\Mailer\MailerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Mime\Email;

class ImportantActionToEmailSender implements SenderInterface
{
    private $mailer;
    private $toEmail;

    public function __construct(MailerInterface $mailer, string $toEmail)
    {
        $this->mailer = $mailer;
        $this->toEmail = $toEmail;
    }

    public function send(Envelope $envelope): Envelope
    {
        $message = $envelope->getMessage();

        if (!$message instanceof ImportantAction) {
            throw new \InvalidArgumentException(sprintf('This transport only supports "%s" messages.', ImportantAction::class));
        }

        $this->mailer->send(
            (new Email())
                ->to($this->toEmail)
                ->subject('Important action made')
                ->html('<h1>Important action</h1><p>Made by '.$message->getUsername().'</p>')
        );

        return $envelope;
    }
}

Ваш собственный получатель

Получатель отвечает за получение сообщений из источника и их диспетчеризацию приложению.

Представьте, что вы уже обработали какие-то "команды" в вашем приложении, используя сообщение NewOrder. Теперь вы хотите интегрироваться с третьей стороной или унаследованным приложением, но вы не можете использовать API и вам нужно использовать общедоступный CSV с новыми командами.

Вы прочтёте этот CSV-файл и запустите сообщение NewOrder. Всё, что вам нужно сделать - это написать ваш пользовательский CSV получатель:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
namespace App\MessageReceiver;

use App\Message\NewOrder;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Serializer\SerializerInterface;

class NewOrdersFromCsvFileReceiver implements ReceiverInterface
{
    private $serializer;
    private $filePath;

    public function __construct(SerializerInterface $serializer, string $filePath)
    {
        $this->serializer = $serializer;
        $this->filePath = $filePath;
    }

    public function get(): iterable
    {
        // Получите конверт в соответствии с вашим транспортом, в большинстве,
        // случаев ($yourEnvelope здесь), использование связи является самым простым решением
        if (null === $yourEnvelope) {
            return [];
        }

        try {
            $envelope = $this->serializer->decode([
                'body' => $yourEnvelope['body'],
                'headers' => $yourEnvelope['headers'],
            ]);
        } catch (MessageDecodingFailedException $exception) {
            $this->connection->reject($yourEnvelope['id']);
            throw $exception;
        }

        return [$envelope->with(new CustomStamp($yourEnvelope['id']))];
    }

    public function ack(Envelope $envelope): void
    {
        // Добавьте информацию об обработанном сообщении
    }

    public function reject(Envelope $envelope): void
    {
        // В случае пользовательского соединения
        $this->connection->reject($this->findCustomStamp($envelope)->getId());
    }
}

Получатель и отправитель в одном автобусе

Чтобы разрешить отправку и получение сообщений в одном и том же автобусе и предотвратить бесконечный цикл, автобус сообщений добавит к конвертам сообщений штамп ReceivedStamp и промежуточное ПО SendMessageMiddleware будет знать, что оно не должно снова направлять эти сообщения на транспорт