Symfony Messenger или как сделать PHP асинхронным

Всем чмоки в этом чатике.

На одном из проектов появилась возможность применить Symfony Messenger — спешу рассказать, показать и оценить.

Зачем нужен Symfony Messenger и что это?

Это компонент, позволяющий выполнять фоновые задачи. Например — отправить СМС или Email-письмо.

А чем хуже обычные события? Ведь я могу создать событие, кинуть его в EventDispatcher и оно тоже выполнится в фоне?

Не совсем. Оно выполнится синхронно, т.е. в рамках текущего процесса. Грубо говоря — отправляя смс-сообщение, мы всё равно подвесим процесс и заставим клиента смотреть в крутящийся прелоадер.

Тут-то мы и подходим к главной плюшке Symfony Messenger — это асинхронное выполнение.

Как это работает?

  • Есть три основных сущности:
    • сообщение (наш кастомный класс, хранящий информацию)
    • обработчик этого сообщения (класс, принимающий сообщение и делающий с ним какую-либо полезную работу)
    • диспатчер — инструмент, позволяющий инициировать новое сообщение (тем самым запуская процесс его асинхронного выполнения)
  • Создаётся класс — сообщение
  • Создаётся класс — обработчик
  • Запускается процесс, проверяющий есть ли новые сообщения, которые нужно выполнить в фоне
  • Пока процесс крутится и память мутится, где-то в приложении мы через диспатчер бросаем новое сообщение на фоновое исполнение
  • Profit

Документация не описывает, как это работает с системной точки зрения. Но, могу предположить, что обработка каждого сообщения запускается в новом процессе, запуск которых инициируется из процесса, который лонг-пуллит очередь новых сообщений. Таким образом — это полностью асинхронная работа ( в случае, если в системе есть более одного ядра) а не реализация псевдо-асинхронности через событийную модель, как это сделано, например, в ноде.

Теперь давай посмотрим как работает Messenger Component.

Устанавливаем — composer require messenger.

Создаём класс src\\Message\\SmsMessage.php:

<?php

namespace App\Message;

class SmsMessage
{
    /**
     * @var string
     */
    private string $text;

    public function __construct(string $text)
    {
        $this->text = $text;
    }

    public function getText(): string
    {
        return $this->text;
    }
}

Создаём обработчик src\MessageHandler\SmsHandler.php:

<?php

namespace App\MessageHandler;

use App\Message\SmsMessage;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class SmsHandler implements MessageHandlerInterface
{

    /**
     * @var LoggerInterface
     */
    private LoggerInterface $logger;

    public function __construct(LoggerInterface $logger)
    {

        $this->logger = $logger;
    }

    public function __invoke(SmsMessage $message)
    {
        $this->logger->info('Отправка СМС');
        sleep(5);
        $this->logger->info('Обработчик успешно завершил работу');

        return 0;
    }
}

Как правило — отправка сообщения происходит через сервисы-посредники (например esputnik), поэтому обработчик сведётся к простому POST-запросу. Нам для тестов удобней будет просто писать лог, а не мониторить запрос и его статусы.

Теперь нужно зарегистрировать данное сообщение в системе как сообщение Symfony Messenger’a. Для этого в config/packages/messenger.yaml пишем:

framework:
  messenger:
    transports:
     async: '%env(MESSENGER_TRANSPORT_DSN)%'

    routing:
     'App\Message\SmsMessage': async

routing — указывает сообщению транспорт. transport — раздел, описанный выше и определяющий стратегии обработки сообщения. Например async — асинхронное выполнение. Значение — это мессадж-бас, определённый в .env-файле. Я, в качестве баса, установил Redis:

.env:

MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages

В системе, само собой, должен быть установлен redis-server. Важно, чтобы версии, старше 5.0. Если у тебя debian-подобный linux, то через apt install тебе по-умолчанию установится Redis 4-ой версии. Поэтому, проще собрать из сорцов — например, как описано тут.

Для того, чтобы работать с Redis из-под PHP — нужно установить один из клиентов, например — php-redis. Как установить — можно почитать в сети, я ставил через PECL и руками зарегистрировал расширение.

Теперь нужно запустить фоновое прослушивание очереди:

php bin/console messenger:consume async -vv

И (я это сделаю через команду) задиспатчить сообщение:

<?php

namespace App\Command;

use App\Message\SmsMessage;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\MessageBusInterface;

class TestCommand extends Command
{
    protected static $defaultName = 'app:test';
    /**
     * @var MessageBusInterface
     */
    private MessageBusInterface $bus;

    public function __construct(MessageBusInterface $bus, string $name = null)
    {
        parent::__construct($name);
        $this->bus = $bus;
    }

    protected function configure()
    {
        $this
            ->setDescription('messenger test>')
        ;
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $this->bus->dispatch(
            new SmsMessage('Текст сообщения')
        );
        return 0;
    }
}

MessageBusInterface имеет метод dispatch — именно он и бросает сообщение, а дальше команда, запущенная выше, проверяет, есть ли для сообщения обработчики.

Я открою логи, запущу команду и мы явно посмотрим на то, что процесс обрабатывается в фоне:

По центру — запуск основного процесса. После запуска процесс завершается. Справа — видим лог сразу после запуска и лог через 5 секунд, что значит, что процесс отработал асинхронно.

Ссылочки:

Twitter — https://twitter.com/SeniorJun

Можно поддержать развитие канала — https://www.patreon.com/junsenior

Наш чатик в tlg — https://t.me/junsenior_chat

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *