rabbitmq, yii2 и symfony console

Общие вопросы по использованию второй версии фреймворка. Если не знаете как что-то сделать и это про Yii 2, вам сюда.
Ответить
slo_nik
Сообщения: 344
Зарегистрирован: 2013.10.07, 19:08

rabbitmq, yii2 и symfony console

Сообщение slo_nik »

Добрый день.

Необходимо реализовать функционал для импорта товаров из yml файлов.

Требуется вынести весь функционал для обработки файлов и запись данных в базу отдельно, без использования возможностей yii2.

Сейчас получилось подключить doctrine, dbal, symfony console. Обработка очередей через rabbitmq и php-amqplib.

Сейчас процесс идёт так:

1) Пользователь заполняет и отправляет форму.

Код: Выделить всё

    public function actionIndex()
    {
        $form = new ImportForm();

        if ($form->load(Yii::$app->request->post()) && $form->validate()) {
        
             $command = new Create\Command(
                userId: $form->user_id,
                name: $form->name ?? $form->website_url,
                typeFile: $form->import_type,
                typeImport: $form->type,
                ext: $form->extension,
                path: $form->website_url,
                updateFields: $form->updated_fields,
                updatePeriod: $form->update_period
            );

            try {
            
                $importDataID = $this->handler->handle($command);
                
                return $this->redirect(['/trade/import/process', 'id' => $importDataID]);
                
            } catch (DomainException $e) {
                Yii::$app->session->setFlash('Error: ' . $e->getMessage());
            }


        }
         return $this->render('index', ['model' => $form, 'tab' => 'import']);
    }              
2) В Handler записываются данные в базу, регистрируется событие

Код: Выделить всё

    public function handle(Command $command): int
    {
        $importFile = ImportFile::create(
          /** */
        );

        $importFile->attachSettings($command->updateFields, $command->updatePeriod);

        $this->repo->add($importFile);

        $this->flusher->flush();

        $this->dispatcher->dispatchAll($importFile->releaseEvents());

        return $importFile->getIdImport();

    }
3) Теперь надо передать данные в очередь, в которой обработать файл с товарами. Событие и обработчик события подключены в bootstrap файле.

Bootstrap.php

Код: Выделить всё

class Bootstrap implements BootstrapInterface
{
    public function bootstrap($app)
    {
        $container = \Yii::$container;

        $container->setSingleton(EventsDispatcher::class, function (Container $container) {
               return new EventDispatcher($container, [
                   Event\Import\ImportAdded::class => [Event\Import\ImportAddedListener::class]
               ]);
        });
    }
}
ImportAdded.php

Код: Выделить всё

class ImportAdded
{
    public function __construct(
        public readonly int $importId,
        public readonly string $file
    ) {
    }
}
ImportAddedListener.php

Код: Выделить всё

class ImportAddedListener
{

    private AMQPStreamConnection $connection;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection('import-rabbitmq', 5672, 'rabbit', 'rabbit');
        parent::__construct();
    }

    public function handle(): void
    {
        $connection = $this->connection;

        $channel = $this->connection->channel();

        AMQPHelper::initNotifications($channel);
        AMQPHelper::registerShutdown($connection, $channel);

            $data = [
                'type' => 'notification',
                'user_id' => $i,
                'title' => 'Title! Count: ' . $i,
                'body' => 'Hello!!! Time: ' . date('d-m-Y H:i:s', time()),
            ];

            $message = new AMQPMessage(
                json_encode($data),
                ['content_type' => 'text/plain']
            );

            $channel->basic_publish($message, AMQPHelper::EXCHANGE_NOTIFICATIONS, 'note');
    }
}
Ну и файл consume, который запущен в консоли.

ConsumeCommand.php

Код: Выделить всё

class ConsumeCommand extends Command, пока содержит тестовый код, просто читает сообщения.
{
    private $connection;

    public function __construct(AMQPStreamConnection $connection)
    {
        $this->connection = $connection;
        parent::__construct();
    }

    protected function configure()
    {
        $this->setName('amqp:demo:consume');
    }

    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $output->writeln('<comment>Consume message</comment>');

        $connection = $this->connection;

        $channel = $connection->channel();

        AMQPHelper::initNotifications($channel);
        AMQPHelper::registerShutDown($connection, $channel);

        $consumerTag = 'consume_' . getmypid();

        $channel->basic_consume(AMQPHelper::QUEUE_NOTIFICATIONS, $consumerTag, false, false, false, false, function($message) use ($output)
        {
              $output->writeln(print_r(json_decode($message->body, true), true));

              /** @var AMQPChannel $channel */
              $channel = $message->delivery_info['channel'];
              $channel->basic_ack($message->delivery_info['delivery_tag']);
        });

        while(count($channel->callbacks)) {
            $channel->wait();
        }

        $output->writeln('<info>Done!</info>');
    }

}
Проблема у меня с файлом ImportAddedListener.php. Не только в запуске в консоли, но и в понимании его назначения. Как я понимаю, в этом файле не выполняется никакой
код, а просто отсылаются данные в очередь, которые будут обрабатываться в ComsumeCommand.php, в котором будет разбор файла с товарами, запись данных в базу.

Если оставить как есть, то будет работать. То есть, в ProduceCommand.php будут переданы такие данные как id пользователя и путь к файлу yml. Эти данные принимает ConsumeCommand.php и уже в нём обрабатывается файл с товарами.

Но товаров в yml файле много, несколько тысяч, и получается, что ConsumeCommand.php будет нагружен. Есть такая мысль, что в самом ProduceCommand.php разбирать файл и отправлять товары в очередь по одному. Для этого надо будет добавить цикл, но вот он и повесит страницу сайта. Значит надо файл ProduceCommand.php тоже запускать в фоне.

Как сделать так, чтобы ProduceCommand.php обрабатывался в фоне и запущен он был через symfony/console? Это главный вопрос.

Второстепенный вопрос, правильно ли я понимаю назначение файлов ProduceCommand.php и ConsumeCommand.php?
Ответить