Импорт товаров через rabbitmq

Общие вопросы по использованию второй версии фреймворка. Если не знаете как что-то сделать и это про Yii 2, вам сюда.
slo_nik
Сообщения: 344
Зарегистрирован: 2013.10.07, 19:08

Импорт товаров через rabbitmq

Сообщение slo_nik »

Есть сайт, для парсинга yml файлов с товарами, который надо довести до ума.
Парсинг идёт через очереди.
Используется RabbitMQ.
Первоночально загружается или указывается url файла с товарами. Также можно настроить обновление товаров по расписанию.

Но есть моменты, которые меня очень смущают.

Во-первых, в конфиграции настроена одна очередь для обработки всего и первого импрорта товаров и обновление товаров по расписанию

Код: Выделить всё

'queue' => [
    'class' => yii\queue\amqp_interop\Queue::class,
    'driver' => yii\queue\amqp_interop\Queue::ENQUEUE_AMQP_LIB,
    'queueName' => 'queue',
    'dsn' => 'amqp://' . getenv('RABBITMQ_DEFAULT_USER') . ':' . getenv('RABBITMQ_DEFAULT_PASS') . '@import-rabbitmq',
    'as idAccess' => JobIdAccessBehavior::class
],
Можно добавить вторую очередь, первая будет для обработки первоночального импорта товара, а вторая будет обрабатывать обновление товаров по расписанию.

Но тут возникает другая проблема.

Изначально yml файл парсится и все данные записываются в файл json строкой, который скармливается очереди. Товаров может быть как и 100 штук, так и 10 000(и более).
Для обработки файлов с маленьким количеством товаров проблем не возникает, а вот с большим количеством товаров частая проблема заключается в том, что если очередь не успевает отработать за определённое время, то при второй попытке обработать данные очередь начинает читать файл с самого начала и товары обновляются по второму кругу.
Поэтому задачи в очереди накапливаются и обновление может занять продолжительное время. Пока сайт в тестовом режиме, файлов не так уж и много, в конце концов очередь разгребёт все задачи.
Что можно предпринять в этой ситуации?
Пока на ум приходит только одно решение, дробить файл на более мелкие, например по 500 товаров, и скармливать на обработку по очереди. Но тогда всё-равно, как мне кажется, останется проблема с накоплением задач в очереди.
Создавать динамически очереди и настраивать их в rabbitmq? Но пока не представляю как это сделать штатными средствами yii2.

Во-вторый, есть проблема со скоростью обработки данных. Думаю, что это происходит из-за избытка условий и запросов в базу данных.
Из файла берётся json строка со всеми товарами, в цикле проверяется каждый товар.
Если он уже есть в базе данных - обновляется, если нет - пишется новый товар.

Сейчас код job-ы выглядит так

Код: Выделить всё

<?php

namespace core\jobs;

use ...

class FinishPeriodYmlImport extends BaseObject implements JobInterface
{
    public $attribute = 'userSlug';
    public $transliterateOptions = 'Russian-Latin/BGN; Any-Latin; Latin-ASCII; NFD; [:Nonspacing Mark:] Remove; NFC;';

    public $importData;
    public $count;
    public $delay;
    public $id;
    public $importCount;

    public function execute($queue)
    {
        $jobId = $queue->getJobId();
        $productsAdd = 0;
        $productsError = 0;
        $productsEdit = 0;
        $skipProduct = 0;

        $alias = Yii::getAlias('@backend/web/import');
        $dir = $alias . '/' . $this->id;
        
        // Эти три запроса мне кажутся лишними, думаю, что достаточно передать уже готовые данные из консольного контроллера
        $globalSettings = ImportSetting::find()->where(['type' => 'YML'])->one();
        $import = ImportFile::find()->where(['id' => $this->id])->one(); // ПОВТОРНЫЙ ЗАПРОС!!! Первый в PeriodImportTestController
        $setting = ImportFileSetting::find()->where(['import_file_id' => $import->id])->one(); // ПОВТОРНЫЙ ЗАПРОС!!! Первый в PeriodImportTestController

        $fileProducts = file_get_contents($this->importData);
        $products = unserialize($fileProducts);

        $imageArray = [];
        $imageIndex = 0;
        $noCategory = [];

        foreach($products as $product)
        {
            /** @var Trade $trade */
            $trade = Trade::find()->where(['external_id' => $product['id'], 'user_id' => $import->user_id])->one();
            $currency = Currency::find()->where(['sign' => $product['currencyId'], 'module' => 2])->one();

            $userCategory = TradeUserCategory::find()->where(['name' => $product['category'], 'user_id' => $import->user_id])->one();

            if($userCategory)
            {
                $settingCategory = ImportFileSettingCategory::find()->where(['import_file_setting_id' => $setting->id, 'user_category_name' => $userCategory->name])->one();
                if($settingCategory->active == 1)
                {
                    try {
                        $description = strip_tags($product['description'], '<p><br><ul><ol><li><u><em><b><strong>');
                        if(is_null($currency))
                        {
                            $defaultCurrency = Currency::find()->where(['default' => 1, 'module' => 2])->one();
                            $currencyValue = $defaultCurrency->id;
                        }

                        if(!is_null($currency))
                        {
                            $currencyValue = $currency->id;
                        }

                        if(!is_null($trade))
                        {
                            $moderationFields = json_decode($globalSettings->moderation_fields);

                            // Если в настройках указано обновление по расписанию
                            if(strlen($setting->updated_fields) > 4 && $import->type == 'few')
                            {

                                foreach(json_decode($setting->updated_fields) as $field)
                                {
                                    if($field != 'pictures') {
                                        $trade->{$field} = is_array($product[$field]) ? json_encode($product[$field]) : $product[$field];
                                    }

                                    if (in_array($field, $moderationFields)) {
                                        if($trade->getOldAttribute($field) !== $product[$field]){
                                            $trade->setPreModerationStatus();
                                        }
                                    }

                                    if($field == 'pictures') {
                                        $imageArray[$imageIndex]['trade_id'] = $trade->id;
                                        $imageArray[$imageIndex]['images'] = $product['pictures'];
                                    }
                                }

                            }


                            // Если в настройках указано разовое обновление
                            if(!is_null($setting->updated_fields) || $import->type == 'one')
                            {
                                foreach(json_decode($setting->updated_fields) as $field){
                                    if (in_array($field, $moderationFields)) {
                                        if($trade->getOldAttribute($field) !== $product[$field]){
                                            $trade->setPreModerationStatus();
                                        }
                                    }
                                }
                                $trade->user_id = $import->user_id ?: 0; // ???
                                $trade->category_id = $userCategory->category_id;
                                $trade->currency_id = $currencyValue;
                                // обновляются остальные значения свойств модели Trade
                            }

                            try{
                                $trade->save();
                                $productsEdit++;
                                $imageArray[$imageIndex]['trade_id'] = $trade->id;
                                $imageArray[$imageIndex]['images'] = $product['pictures'];
                            } catch (Exception $exception){
                                // Запись в базу
                                ImportLog::error($this->id, 'Ошибка при обновлении товара: ' . mb_substr($exception->getMessage(), 0, 150), $this->importCount);
                                $productsError++;
                                $this->recursiveRemoveDir($dir);
                                //}
                            }

                        }
                        
                        // Если это новый товар, ранее не импортировался.
                        if(is_null($trade))
                        {
                            $newProduct = new Trade();
                            $newProduct->user_id = $import->user_id ?: 0;
                            $newProduct->company_id = $product['company_id'];
                             // присваиваются остальные значения свойствам модели Trade

                            try{
                                $newProduct->save();
                                $productsAdd++;
                                $imageArray[$imageIndex]['trade_id'] = $newProduct->id;
                                $imageArray[$imageIndex]['images'] = $product['pictures'];
                            } catch (Exception $exception){
                               // Запись в базу
                                ImportLog::error($this->id, 'Ошибка при добавлении нового товара: ' . mb_substr($exception->getMessage(), 0, 150), $this->importCount);
                                $productsError++;
                                $this->recursiveRemoveDir($dir);
                            }
                        }
                        $imageIndex++;
                    } catch (Exception $e) {
                       // Запись в базу
                        ImportLog::error($this->id, 'Ошибка при обработке товара: ' . $product['name'] . '. Причина: ' . mb_substr($e->getMessage(), 0, 150), $this->importCount);
                    }
                }
            }else{
                $skipProduct++;
                if(!in_array($product['category'], $noCategory)){
                    $noCategory[] = $product['category'];
                }
            }
        }

        // Завершение работы очереди
        if($import->last_queue_id == $jobId){
            if(count($noCategory) > 0){
                $text = 'Не сопоставлены категории: ' . implode(',', $noCategory);
                // Запись в базу
                ImportLog::warning($this->id, $text, $this->importCount);
            }
            $text = 'Добавлено товаров ' . $productsAdd .
                '.<br /> Обновлено ' . $productsEdit .
                ".<br /> <span style=\"color:#dd4b39;\"> С ошибками " . $productsError . '</span>.'.
                '<br /> <span style="color:#f39c12;">Пропущено товаров (не сопоставлены категории):' . $skipProduct . '</span>';
            // Запись в базу    
            ImportLog::success($this->id,  $text, $this->importCount);
            // Запись в базу
            ImportLog::success($this->id, 'Обновление завершено.', $this->importCount);

            $import->changeStatus(ImportFile::STATUS_DONE);
            $import->touch('updated_at');
            $import->save();
            $this->recursiveRemoveDir($dir);
        }

    }
}
Как можно улучшить этот код?

p.s. Код сумбурный, если надо - дам любые комментарии по коду.
Аватара пользователя
ElisDN
Сообщения: 5845
Зарегистрирован: 2012.10.07, 10:24
Контактная информация:

Re: Импорт товаров через rabbitmq

Сообщение ElisDN »

Вы можете сократить потребление памяти, если для всех дополнительных запросов кроме $trade вместо больших объектов со всеми полями:

Код: Выделить всё

$currency = Currency::find()->where(...)->one();
будете запрашивать данные в виде ассоциативного массива asArray() только с нужными в select(...) полями:

Код: Выделить всё

$currency = Currency::find()->select(['id'])->where(...)->asArray()->one();
Чтобы снизить нагрузку на базу для некоторых данных вроде тех же настроек можно добавить кеширование:

Код: Выделить всё

$setting = ImportFileSetting::find()->...->cache(3600)->one();
Последний раз редактировалось ElisDN 2022.01.27, 12:30, всего редактировалось 2 раза.
Аватара пользователя
ElisDN
Сообщения: 5845
Зарегистрирован: 2012.10.07, 10:24
Контактная информация:

Re: Импорт товаров через rabbitmq

Сообщение ElisDN »

Ещё у вас в цикле выполняется одинаковый для каждого продукта запрос:

Код: Выделить всё

$defaultCurrency = Currency::find()->where(['default' => 1, 'module' => 2])->one();
Его нужно переместить вверх к запросу $settings до цикла.
Аватара пользователя
ElisDN
Сообщения: 5845
Зарегистрирован: 2012.10.07, 10:24
Контактная информация:

Re: Импорт товаров через rabbitmq

Сообщение ElisDN »

Помимо этого вы делаете сохранение с валидацией:

Код: Выделить всё

$trade->save();
Если в классе Trade в rules() есть правила валидации 'exist' или 'unique', то они тоже будут при каждом save ходить в БД. Поэтому если такие есть, то можно либо импортировать без валидации:

Код: Выделить всё

$trade->save(false);
Либо сделать отдельный сценарий для импорта, в котором отключить эти правила, и потом указывать его:

Код: Выделить всё

$trade->scenario = Trade::SCENARIO_IMPORT;
Аватара пользователя
ElisDN
Сообщения: 5845
Зарегистрирован: 2012.10.07, 10:24
Контактная информация:

Re: Импорт товаров через rabbitmq

Сообщение ElisDN »

А в общем да, вместо создания одной задачи с 10 000 товаров в контроллере лучше создать 100 задач по 100 товаров и рандомно разложить в 10 очередей, для которых поднять >=10 воркеров.
slo_nik
Сообщения: 344
Зарегистрирован: 2013.10.07, 19:08

Re: Импорт товаров через rabbitmq

Сообщение slo_nik »

ElisDN писал(а): 2022.01.27, 13:05 А в общем да, вместо создания одной задачи с 10 000 товаров в контроллере лучше создать 100 задач по 100 товаров и рандомно разложить в 10 очередей, для которых поднять >=10 воркеров.
Вот в этом и вопрос, как это реализовать?
В конфигурации создать 10 очередей? Но как потом рандомно разложить задачи по этим очередям?
Пока не нашёл в документации к yii2 ответ на этот вопрос.
Подскажите, пожалуйста, как это сделать.

Сейчас в консольном контроллере постановка в очередь идёт таким образом

Код: Выделить всё

file_put_contents($directory . '/importYMLData' . $import->id . '.xml', $dataFile);
Yii::$app->queue->ttr(10000)->push(
      new PeriodYmlImport([
            'maxCount' => $import->importSetting->limit_count,
            'file' => $directory . '/importYMLData' . $import->id . '.xml',
            'importId' => $import->id,
            'importCount' => ($import->logs[0]->count + 1),
            'companyId' => $import->user->company->id
      ])
 );
То есть, здесь, надо прочитать файл с товарами, разделить его на несколько частей и для каждой части определить свою очередь?

Забыл сразу указать, тут есть ещё промежуточный файл, в котором идёт разбор файла и создаётся массив с данными, который уже передаётся в следующую очередь, код этой очереди я написал в первом сообщении.
В промежуточном скрипте идёт простой разбор в цикле.
Вот такая цепочка очередей может влиять на скорость обработки данных?

По поводу запросов к базе.
Может будет лучше вообще все эти данные получить в консольном контроллере и передать в очередь готовые данные?
И ещё, на каждый чих пишутся данные в таблицу лога

Код: Выделить всё

ImportLog::success($this->id, 'Обновление завершено.', $this->importCount);
Насколько плох такой подход?
duda
Сообщения: 43
Зарегистрирован: 2015.07.06, 22:05

Re: Импорт товаров через rabbitmq

Сообщение duda »

возможно стоит и сам импорт разбить на задачи.

1. Разбить импорт каждого товара на импорт 1 товара. - JOB
2. Импорт одного товара. - JOB
3. Отчет о результатах. Сколько было задач - сколько с ошибками, импортированных - JOB
slo_nik
Сообщения: 344
Зарегистрирован: 2013.10.07, 19:08

Re: Импорт товаров через rabbitmq

Сообщение slo_nik »

duda писал(а): 2022.01.27, 17:42
Ну так Дмитрий это и имел ввиду, у меня тоже такая мысль была. Только не по одному товару в очередь ставить, а разбивать массив товаров на пачки.

Я не могу понять только, как рандомно разложить их по очередям, как советует Дмитрий.
В конфигурации можно создать несколько очередей, также в конфиге supervisor для каждой очереди создать настройки. Как задачи потом размещать по этим очередям?
Аватара пользователя
ElisDN
Сообщения: 5845
Зарегистрирован: 2012.10.07, 10:24
Контактная информация:

Re: Импорт товаров через rabbitmq

Сообщение ElisDN »

slo_nik писал(а): 2022.01.27, 14:28 В конфигурации создать 10 очередей? Но как потом рандомно разложить задачи по этим очередям?
Да, в случае yii2-queue объявить несколько отдельных:

Код: Выделить всё

'bootstrap' => [
    'queue1',
    ...
    'queue10',
],
'components' => [
    'queue1' = [...],
    ...
    'queue10' = [...],
],
И дёргать рандомно:

Код: Выделить всё

Yii::$app->get('queue' . random_int(1, 10))->ttr(10000)->push(...);
И на сервере через supervisor поднимать слушатели для каждой:

Код: Выделить всё

yii queue1/listen
...
yii queue10/listen
slo_nik
Сообщения: 344
Зарегистрирован: 2013.10.07, 19:08

Re: Импорт товаров через rabbitmq

Сообщение slo_nik »

ElisDN писал(а): 2022.01.27, 20:16
slo_nik писал(а): 2022.01.27, 14:28 В конфигурации создать 10 очередей? Но как потом рандомно разложить задачи по этим очередям?
Да, в случае yii2-queue объявить несколько отдельных:

Код: Выделить всё

'bootstrap' => [
    'queue1',
    ...
    'queue10',
],
'components' => [
    'queue1' = [...],
    ...
    'queue10' = [...],
],
И дёргать рандомно:

Код: Выделить всё

Yii::$app->get('queue' . random_int(1, 10))->ttr(10000)->push(...);
И на сервере через supervisor поднимать слушатели для каждой:

Код: Выделить всё

yii queue1/listen
...
yii queue10/listen
В общем так и делал, но только не додумался до такого решения

Код: Выделить всё

Yii::$app->get('queue' . random_int(1, 10))->ttr(10000)->push(...)
И вопрос ещё по очередям для записей в лог, как советует duda если я его правильно понял.
Для записей в лог создать свою очередь и все сообщения отправлять в эту очередь.
Что-то типа такого:

Код: Выделить всё

Yii::$app->queueError->push($importId, $message, $importCount);
Возможно ли, что при таком подходе запись в таблицу логов будет идти не по порядку? Или если использовать только одну очередь, то все сообщения сохранят порядок при записи. То есть -> "Начало импорта" -> "Импортировано товаров" -> "Импорт завершён".

Так же ещё вопрос по настройкам самого RabbitMQ.
Как я понимаю, для каждой очереди, которая будет описана в конфигурации yii2, помимо уникального имени нужно будет указать уникальный exchenge?
slo_nik
Сообщения: 344
Зарегистрирован: 2013.10.07, 19:08

Re: Импорт товаров через rabbitmq

Сообщение slo_nik »

ElisDN писал(а): 2022.01.27, 20:16
В конфигурации очереди не указан параметр serializer, но в самом коде данные обрабатываются serialize() и unserialize(). Может это влиять на скорость обработки данных? Или лучше использовать serializer в конфигурации, а в коде отказаться от serialize() и unserialize()?
Аватара пользователя
SiZE
Сообщения: 2813
Зарегистрирован: 2011.09.21, 12:39
Откуда: Perm
Контактная информация:

Re: Импорт товаров через rabbitmq

Сообщение SiZE »

Я на всякий случай напомню, что если в запросе выборка происходит не по уникальному индексу, то надо лимит задавать

Код: Выделить всё

->limit()->one()
т.к. one() возвращает первую строку из результата, не ограничивая выборку
Аватара пользователя
ElisDN
Сообщения: 5845
Зарегистрирован: 2012.10.07, 10:24
Контактная информация:

Re: Импорт товаров через rabbitmq

Сообщение ElisDN »

slo_nik писал(а): 2022.01.27, 14:28 Вот такая цепочка очередей может влиять на скорость обработки данных?
slo_nik писал(а): 2022.01.27, 22:50 Или лучше использовать serializer в конфигурации, а в коде отказаться от serialize() и unserialize()?
Проверьте через профилирование, сколько времени занимает каждый этап.
slo_nik писал(а): 2022.01.27, 14:28 Может будет лучше вообще все эти данные получить в консольном контроллере и передать в очередь готовые данные?
И ещё, на каждый чих пишутся данные в таблицу лога.
Насколько плох такой подход?
Не на каждый чих, а всего по одному разу до и после foreach($products). На скорость влияют только сотни запросов в цикле.
SiZE писал(а): 2022.01.28, 10:14 Я на всякий случай напомню, что если в запросе выборка происходит не по уникальному индексу, то надо лимит задавать
Да, желательно везде ставить:

Код: Выделить всё

->limit(1)->one()
slo_nik
Сообщения: 344
Зарегистрирован: 2013.10.07, 19:08

Re: Импорт товаров через rabbitmq

Сообщение slo_nik »

ElisDN писал(а): 2022.01.28, 11:49
А как всё-таки с настройками самого rabbitmq?
Нужно определять отдельные exchenge для каждой очереди?
На какие настройки Вы рекомендуете обратить внимание в первую очередь?
Аватара пользователя
ElisDN
Сообщения: 5845
Зарегистрирован: 2012.10.07, 10:24
Контактная информация:

Re: Импорт товаров через rabbitmq

Сообщение ElisDN »

slo_nik писал(а): 2022.01.28, 19:12 А как всё-таки с настройками самого rabbitmq?
Нужно определять отдельные exchenge для каждой очереди?
Либо сделать отдельные exchange1..10 для каждой очереди queue1..10 и самому отправлять в 'exchange' . random_int(1, 10)

Либо подключить плагин и определить один exchange с типом x-random и привязать к нему десять очередей.
slo_nik
Сообщения: 344
Зарегистрирован: 2013.10.07, 19:08

Re: Импорт товаров через rabbitmq

Сообщение slo_nik »

ElisDN писал(а): 2022.01.27, 13:05 и рандомно разложить в 10 очередей, для которых поднять >=10 воркеров.
Вы имеете ввиду настройки supervisor?
slo_nik
Сообщения: 344
Зарегистрирован: 2013.10.07, 19:08

Re: Импорт товаров через rabbitmq

Сообщение slo_nik »

ElisDN писал(а): 2022.01.28, 19:43 Либо сделать отдельные exchange1..10 для каждой очереди queue1..10 и самому отправлять в 'exchange' . random_int(1, 10)
Привязка очереди к exchange будет в автоматическом режиме или надо в настройках rabbitmq, в панели управления, привязать очередь к exchange?
Аватара пользователя
ElisDN
Сообщения: 5845
Зарегистрирован: 2012.10.07, 10:24
Контактная информация:

Re: Импорт товаров через rabbitmq

Сообщение ElisDN »

slo_nik писал(а): 2022.01.28, 19:51 Привязка очереди к exchange будет в автоматическом режиме или надо в настройках rabbitmq, в панели управления, привязать очередь к exchange?
Расширение yii2-queue в драйвере для php-amqplib/php-amqplib умеет только создавать и привязывать exchange с типом direct.

А в yii2-queue с enqueue/amqp-lib уже можно указать свой exchangeType. Можно с ним попробовать создать десять компонентов queue1..queue10 с разными queueName и одним и тем же exchangeName с типом exchangeType='x-random', чтобы он привязал все очереди автоматом к одному exchange. И потом все сообщения отправлять только в первый Yii::$app->queue1->push(...).
slo_nik
Сообщения: 344
Зарегистрирован: 2013.10.07, 19:08

Re: Импорт товаров через rabbitmq

Сообщение slo_nik »

ElisDN писал(а): 2022.01.28, 20:06
Для большинства задач, как я понимаю, достаточно будет exchenge с типом direct?
Аватара пользователя
ElisDN
Сообщения: 5845
Зарегистрирован: 2012.10.07, 10:24
Контактная информация:

Re: Импорт товаров через rabbitmq

Сообщение ElisDN »

slo_nik писал(а): 2022.01.28, 20:27 Для большинства задач, как я понимаю, достаточно будет exchenge с типом direct?
Да, для одной очереди всё равно, какой маршрутизатор.
Ответить