Транзакционные сообщение: обрабатывайте сообщения после того, как обработка закончена

Дата обновления перевода 2021-09-28

Транзакционные сообщение: обрабатывайте сообщения после того, как обработка закончена

Обработчик сообщений может dispatch новое сообщение во время обработки других, в тот же либо другой автобус (если приложение имеет несколько автобусов). Любые ошибки или исключения, которые возникают в этом процессе, могут иметь ненамеренные последствия, вроде:

  • Если вы используете DoctrineTransactionMiddleware а запущенное сообщение вызывает исключение, то любые транзакции базы данных в изначальном обработчике будут отменены.
  • Если сообщение запущено в другой автобус, то запущенное сообщение будет обработано, даже если какой-то код позже в текущем обработчике вызовет исключение.

Пример процесса RegisterUser

Давайте в качестве примера возьмем приложение с автобусами команд и событий. Приложение запускает команду под названием RegisterUser в автобус команд. Команда обрабатывается RegisterUserHandler, что создает объект User, сохраняет этот объект в базе данных и запускает сообщение UserRegistered в автобус событий.

Существует много обработчиков сообщения UserRegistered, один может отправлять приветственное письмо новому пользователю. Мы используем DoctrineTransactionMiddleware, чтобы обернуть все запросы БД в одну транзакцию БД.

Проблема №1: Если во время отправки приветственного письма вызывается исключение, то пользователь не будет создан, так как DoctrineTransactionMiddleware откатится к транзакции Doctrine, в которой был создан пользователь.

Проблема №2: Если исключение вызывается при сохранении пользователя в БД, приветственное письмо все равно будет отправлено, так как оно обрабатывается асинхронно.

Промежуточное ПО DispatchAfterCurrentBusMiddleware

Для многих приложений, желаемое поведение - обрабатывать только сообщения, которые запускаются обработчиком после того, как обработчик полностью закончил работу. Это можно сделать используя DispatchAfterCurrentBusMiddleware и добавив штамп DispatchAfterCurrentBusStamp к конверту сообщения :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// src/Messenger/CommandHandler/RegisterUserHandler.php
namespace App\Messenger\CommandHandler;

use App\Entity\User;
use App\Messenger\Command\RegisterUser;
use App\Messenger\Event\UserRegistered;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;

class RegisterUserHandler
{
    private $eventBus;
    private $em;

    public function __construct(MessageBusInterface $eventBus, EntityManagerInterface $em)
    {
        $this->eventBus = $eventBus;
        $this->em = $em;
    }

    public function __invoke(RegisterUser $command)
    {
        $user = new User($command->getUuid(), $command->getName(), $command->getEmail());
        $this->em->persist($user);

        // DispatchAfterCurrentBusStamp помечает сообщение события для обработки
        // только если этот обработчик не вызывает исключения.

        $event = new UserRegistered($command->getUuid());
        $this->eventBus->dispatch(
            (new Envelope($event))
                ->with(new DispatchAfterCurrentBusStamp())
        );

        // ...
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// src/Messenger/EventSubscriber/WhenUserRegisteredThenSendWelcomeEmail.php
namespace App\Messenger\EventSubscriber;

use App\Entity\User;
use App\Messenger\Event\UserRegistered;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Mailer\MailerInterface;
use Symfony\Component\Mime\RawMessage;

class WhenUserRegisteredThenSendWelcomeEmail
{
    private $mailer;
    private $em;

    public function __construct(MailerInterface $mailer, EntityManagerInterface $em)
    {
        $this->mailer = $mailer;
        $this->em = $em;
    }

    public function __invoke(UserRegistered $event)
    {
        $user = $this->em->getRepository(User::class)->find($event->getUuid());

        $this->mailer->send(new RawMessage('Welcome '.$user->getFirstName()));
    }
}

Это означает, что сообщение UserRegistered не будет обработано до тех пор, пока не будет выполнен RegisterUserHandler и новый User не будет сохранен в базу данных. Если RegisterUserHandler столкнется с исключением, событие UserRegistered никогда не будет обработано. А если исключение будет вызывано во время отправки приветственного письма, транзакция Doctrine не будет отменена.

Note

Если WhenUserRegisteredThenSendWelcomeEmail вызывает исключение, оно будет обернуто в DelayedMessageHandlingException. Использование DelayedMessageHandlingException::getExceptions предоставит вам все исключения, которые вызываются во время обработки сообщения с DispatchAfterCurrentBusStamp.

Промежуточное ПО dispatch_after_current_bus подключается по умолчанию. Если вы конфигурируете свое промежуточное ПО вручную, не забудьте зарегистрировать dispatch_after_current_bus перед doctrine_transaction в цепочке промежуточного ПО. Также, промежуточное ПО dispatch_after_current_bus должно быть загружено для всех используемых автобусов.