Парсинг идёт через очереди.
Используется RabbitMQ.
Первоночально загружается или указывается url файла с товарами. Также можно настроить обновление товаров по расписанию.
Но есть моменты, которые меня очень смущают.
Во-первых, в конфиграции настроена одна очередь для обработки всего и первого импрорта товаров и обновление товаров по расписанию
Код: Выделить всё
'queue' => [
'class' => yii\queue\amqp_interop\Queue::class,
'driver' => yii\queue\amqp_interop\Queue::ENQUEUE_AMQP_LIB,
'queueName' => 'queue',
'dsn' => 'amqp://' . getenv('RABBITMQ_DEFAULT_USER') . ':' . getenv('RABBITMQ_DEFAULT_PASS') . '@import-rabbitmq',
'as idAccess' => JobIdAccessBehavior::class
],
Но тут возникает другая проблема.
Изначально yml файл парсится и все данные записываются в файл json строкой, который скармливается очереди. Товаров может быть как и 100 штук, так и 10 000(и более).
Для обработки файлов с маленьким количеством товаров проблем не возникает, а вот с большим количеством товаров частая проблема заключается в том, что если очередь не успевает отработать за определённое время, то при второй попытке обработать данные очередь начинает читать файл с самого начала и товары обновляются по второму кругу.
Поэтому задачи в очереди накапливаются и обновление может занять продолжительное время. Пока сайт в тестовом режиме, файлов не так уж и много, в конце концов очередь разгребёт все задачи.
Что можно предпринять в этой ситуации?
Пока на ум приходит только одно решение, дробить файл на более мелкие, например по 500 товаров, и скармливать на обработку по очереди. Но тогда всё-равно, как мне кажется, останется проблема с накоплением задач в очереди.
Создавать динамически очереди и настраивать их в rabbitmq? Но пока не представляю как это сделать штатными средствами yii2.
Во-вторый, есть проблема со скоростью обработки данных. Думаю, что это происходит из-за избытка условий и запросов в базу данных.
Из файла берётся json строка со всеми товарами, в цикле проверяется каждый товар.
Если он уже есть в базе данных - обновляется, если нет - пишется новый товар.
Сейчас код job-ы выглядит так
Код: Выделить всё
<?php
namespace core\jobs;
use ...
class FinishPeriodYmlImport extends BaseObject implements JobInterface
{
public $attribute = 'userSlug';
public $transliterateOptions = 'Russian-Latin/BGN; Any-Latin; Latin-ASCII; NFD; [:Nonspacing Mark:] Remove; NFC;';
public $importData;
public $count;
public $delay;
public $id;
public $importCount;
public function execute($queue)
{
$jobId = $queue->getJobId();
$productsAdd = 0;
$productsError = 0;
$productsEdit = 0;
$skipProduct = 0;
$alias = Yii::getAlias('@backend/web/import');
$dir = $alias . '/' . $this->id;
// Эти три запроса мне кажутся лишними, думаю, что достаточно передать уже готовые данные из консольного контроллера
$globalSettings = ImportSetting::find()->where(['type' => 'YML'])->one();
$import = ImportFile::find()->where(['id' => $this->id])->one(); // ПОВТОРНЫЙ ЗАПРОС!!! Первый в PeriodImportTestController
$setting = ImportFileSetting::find()->where(['import_file_id' => $import->id])->one(); // ПОВТОРНЫЙ ЗАПРОС!!! Первый в PeriodImportTestController
$fileProducts = file_get_contents($this->importData);
$products = unserialize($fileProducts);
$imageArray = [];
$imageIndex = 0;
$noCategory = [];
foreach($products as $product)
{
/** @var Trade $trade */
$trade = Trade::find()->where(['external_id' => $product['id'], 'user_id' => $import->user_id])->one();
$currency = Currency::find()->where(['sign' => $product['currencyId'], 'module' => 2])->one();
$userCategory = TradeUserCategory::find()->where(['name' => $product['category'], 'user_id' => $import->user_id])->one();
if($userCategory)
{
$settingCategory = ImportFileSettingCategory::find()->where(['import_file_setting_id' => $setting->id, 'user_category_name' => $userCategory->name])->one();
if($settingCategory->active == 1)
{
try {
$description = strip_tags($product['description'], '<p><br><ul><ol><li><u><em><b><strong>');
if(is_null($currency))
{
$defaultCurrency = Currency::find()->where(['default' => 1, 'module' => 2])->one();
$currencyValue = $defaultCurrency->id;
}
if(!is_null($currency))
{
$currencyValue = $currency->id;
}
if(!is_null($trade))
{
$moderationFields = json_decode($globalSettings->moderation_fields);
// Если в настройках указано обновление по расписанию
if(strlen($setting->updated_fields) > 4 && $import->type == 'few')
{
foreach(json_decode($setting->updated_fields) as $field)
{
if($field != 'pictures') {
$trade->{$field} = is_array($product[$field]) ? json_encode($product[$field]) : $product[$field];
}
if (in_array($field, $moderationFields)) {
if($trade->getOldAttribute($field) !== $product[$field]){
$trade->setPreModerationStatus();
}
}
if($field == 'pictures') {
$imageArray[$imageIndex]['trade_id'] = $trade->id;
$imageArray[$imageIndex]['images'] = $product['pictures'];
}
}
}
// Если в настройках указано разовое обновление
if(!is_null($setting->updated_fields) || $import->type == 'one')
{
foreach(json_decode($setting->updated_fields) as $field){
if (in_array($field, $moderationFields)) {
if($trade->getOldAttribute($field) !== $product[$field]){
$trade->setPreModerationStatus();
}
}
}
$trade->user_id = $import->user_id ?: 0; // ???
$trade->category_id = $userCategory->category_id;
$trade->currency_id = $currencyValue;
// обновляются остальные значения свойств модели Trade
}
try{
$trade->save();
$productsEdit++;
$imageArray[$imageIndex]['trade_id'] = $trade->id;
$imageArray[$imageIndex]['images'] = $product['pictures'];
} catch (Exception $exception){
// Запись в базу
ImportLog::error($this->id, 'Ошибка при обновлении товара: ' . mb_substr($exception->getMessage(), 0, 150), $this->importCount);
$productsError++;
$this->recursiveRemoveDir($dir);
//}
}
}
// Если это новый товар, ранее не импортировался.
if(is_null($trade))
{
$newProduct = new Trade();
$newProduct->user_id = $import->user_id ?: 0;
$newProduct->company_id = $product['company_id'];
// присваиваются остальные значения свойствам модели Trade
try{
$newProduct->save();
$productsAdd++;
$imageArray[$imageIndex]['trade_id'] = $newProduct->id;
$imageArray[$imageIndex]['images'] = $product['pictures'];
} catch (Exception $exception){
// Запись в базу
ImportLog::error($this->id, 'Ошибка при добавлении нового товара: ' . mb_substr($exception->getMessage(), 0, 150), $this->importCount);
$productsError++;
$this->recursiveRemoveDir($dir);
}
}
$imageIndex++;
} catch (Exception $e) {
// Запись в базу
ImportLog::error($this->id, 'Ошибка при обработке товара: ' . $product['name'] . '. Причина: ' . mb_substr($e->getMessage(), 0, 150), $this->importCount);
}
}
}else{
$skipProduct++;
if(!in_array($product['category'], $noCategory)){
$noCategory[] = $product['category'];
}
}
}
// Завершение работы очереди
if($import->last_queue_id == $jobId){
if(count($noCategory) > 0){
$text = 'Не сопоставлены категории: ' . implode(',', $noCategory);
// Запись в базу
ImportLog::warning($this->id, $text, $this->importCount);
}
$text = 'Добавлено товаров ' . $productsAdd .
'.<br /> Обновлено ' . $productsEdit .
".<br /> <span style=\"color:#dd4b39;\"> С ошибками " . $productsError . '</span>.'.
'<br /> <span style="color:#f39c12;">Пропущено товаров (не сопоставлены категории):' . $skipProduct . '</span>';
// Запись в базу
ImportLog::success($this->id, $text, $this->importCount);
// Запись в базу
ImportLog::success($this->id, 'Обновление завершено.', $this->importCount);
$import->changeStatus(ImportFile::STATUS_DONE);
$import->touch('updated_at');
$import->save();
$this->recursiveRemoveDir($dir);
}
}
}
p.s. Код сумбурный, если надо - дам любые комментарии по коду.