Skip to content

Commit

Permalink
Improve index queue handling
Browse files Browse the repository at this point in the history
  • Loading branch information
markus-moser committed Jan 16, 2024
1 parent 468d15c commit 4a86613
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 34 deletions.
6 changes: 4 additions & 2 deletions config/pimcore/messenger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ framework:
messenger:
enabled: true
transports:
pimcore_index_queues: 'doctrine://default?queue_name=pimcore_index_queues'
pimcore_generic_data_index_queue: 'doctrine://default?queue_name=pimcore_index_queues'
pimcore_generic_data_index_sync: 'sync://'
routing:
Pimcore\Bundle\GenericDataIndexBundle\Message\IndexUpdateQueueMessage: pimcore_index_queues
Pimcore\Bundle\GenericDataIndexBundle\Message\IndexUpdateQueueMessage: pimcore_generic_data_index_queue
Pimcore\Bundle\GenericDataIndexBundle\Message\DispatchQueueMessagesMessage: pimcore_generic_data_index_queue
buses:
messenger.bus.pimcore-generic-data-index:
middleware:
Expand Down
12 changes: 6 additions & 6 deletions src/Command/Update/IndexUpdateCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,6 @@ protected function configure(): void
}

/**
* @param InputInterface $input
* @param OutputInterface $output
*
* @return int
*
* @throws \Doctrine\DBAL\Exception
* @throws RuntimeException
*/
Expand Down Expand Up @@ -159,7 +154,12 @@ protected function execute(InputInterface $input, OutputInterface $output): int
->updateAll();
}

$this->indexQueueService->dispatchQueueMessages($output);
$this->output->writeln(
'<info>Dispatch queue messages</info>',
OutputInterface::VERBOSITY_VERBOSE
);

$this->indexQueueService->dispatchQueueMessages(true);

$this->release();

Expand Down
10 changes: 10 additions & 0 deletions src/Enum/Messenger/TransportName.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php
declare(strict_types=1);

namespace Pimcore\Bundle\GenericDataIndexBundle\Enum\Messenger;

enum TransportName: string
{
case INDEX_QUEUE = 'pimcore_generic_data_index_queue';
case SYNC = 'pimcore_generic_data_index_sync';
}
10 changes: 10 additions & 0 deletions src/Enum/SearchIndex/IndexName.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php
declare(strict_types=1);

namespace Pimcore\Bundle\GenericDataIndexBundle\Enum\SearchIndex;

enum IndexName: string
{
case ASSET = 'asset';
case DATA_OBJECT = 'data-object';
}
18 changes: 13 additions & 5 deletions src/EventSubscriber/IndexUpdateSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
namespace Pimcore\Bundle\GenericDataIndexBundle\EventSubscriber;

use Exception;
use Pimcore\Bundle\GenericDataIndexBundle\Enum\SearchIndex\IndexName;
use Pimcore\Bundle\GenericDataIndexBundle\Enum\SearchIndex\IndexQueueOperation;
use Pimcore\Bundle\GenericDataIndexBundle\Installer;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueueService;
Expand Down Expand Up @@ -116,9 +117,10 @@ public function addDataObjectMapping(ClassDefinitionEvent $event): void
}

$classDefinition = $event->getClassDefinition();

$this->dataObjectIndexService
->updateMapping($classDefinition, true)
->addClassDefinitionToAlias($classDefinition, 'class_definitions');
->addClassDefinitionToAlias($classDefinition, IndexName::DATA_OBJECT->value);
}

/**
Expand All @@ -131,10 +133,14 @@ public function updateDataObjectMapping(ClassDefinitionEvent $event): void
}

$classDefinition = $event->getClassDefinition();

$this->dataObjectIndexService
->updateMapping($classDefinition)
->addClassDefinitionToAlias($classDefinition, 'class_definitions');
$this->indexQueueService->updateDataObjects($classDefinition);
->addClassDefinitionToAlias($classDefinition, IndexName::DATA_OBJECT->value);

$this->indexQueueService
->updateDataObjects($classDefinition)
->dispatchQueueMessages();
}

public function deleteDataObjectIndex(ClassDefinitionEvent $event): void
Expand All @@ -148,7 +154,7 @@ public function deleteDataObjectIndex(ClassDefinitionEvent $event): void
try {
$this->dataObjectIndexService
->deleteIndex($classDefinition)
->removeClassDefinitionFromAlias($classDefinition, 'class_definitions')
->removeClassDefinitionFromAlias($classDefinition, IndexName::DATA_OBJECT->value)
;
} catch (Exception $e) {
$this->logger->error($e->getMessage());
Expand Down Expand Up @@ -195,7 +201,9 @@ public function deleteTag(TagEvent $event): void
return;
}

$this->indexQueueService->updateByTag($event->getTag());
$this->indexQueueService
->updateByTag($event->getTag())
->dispatchQueueMessages();
}

public function updateTagAssignment(TagEvent $event): void
Expand Down
9 changes: 9 additions & 0 deletions src/Message/DispatchQueueMessagesMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php
declare(strict_types=1);

namespace Pimcore\Bundle\GenericDataIndexBundle\Message;

class DispatchQueueMessagesMessage
{

}
47 changes: 47 additions & 0 deletions src/MessageHandler/DispatchQueueMessagesHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php
declare(strict_types=1);

/**
* Pimcore
*
* This source file is available under following license:
* - Pimcore Commercial License (PCL)
*
* @copyright Copyright (c) Pimcore GmbH (http://www.pimcore.org)
* @license http://www.pimcore.org/license PCL
*/

namespace Pimcore\Bundle\GenericDataIndexBundle\MessageHandler;

use Pimcore\Bundle\GenericDataIndexBundle\Message\DispatchQueueMessagesMessage;
use Pimcore\Bundle\GenericDataIndexBundle\Message\IndexUpdateQueueMessage;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueueService;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;

#[AsMessageHandler]
class DispatchQueueMessagesHandler
{
public function __construct(
protected readonly IndexQueueService $indexQueueService,
protected readonly MessageBusInterface $messageBus
) {
}

public function __invoke(DispatchQueueMessagesMessage $message): void
{
$batchSize = 20;
while (true) {
$entries = $this->indexQueueService->getUnhandledIndexQueueEntries(true, $batchSize);
$amountOfEntries = count($entries);

if ($amountOfEntries > 0) {
$this->messageBus->dispatch(new IndexUpdateQueueMessage($entries));
}

if ($amountOfEntries < $batchSize) {
break;
}
}
}
}
4 changes: 1 addition & 3 deletions src/MessageHandler/IndexUpdateQueueHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@
use Pimcore\Bundle\GenericDataIndexBundle\Message\IndexUpdateQueueMessage;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueueService;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Serializer\Exception\ExceptionInterface;

#[AsMessageHandler]
class IndexUpdateQueueHandler
{
public function __construct(
protected readonly IndexQueueService $indexQueueService,
protected readonly MessageBusInterface $messageBus
protected readonly IndexQueueService $indexQueueService
) {
}

Expand Down
28 changes: 13 additions & 15 deletions src/Service/SearchIndex/IndexQueueService.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
use Exception;
use InvalidArgumentException;
use Pimcore\Bundle\GenericDataIndexBundle\Entity\IndexQueue;
use Pimcore\Bundle\GenericDataIndexBundle\Enum\Messenger\TransportName;
use Pimcore\Bundle\GenericDataIndexBundle\Enum\SearchIndex\ElementType;
use Pimcore\Bundle\GenericDataIndexBundle\Enum\SearchIndex\IndexName;
use Pimcore\Bundle\GenericDataIndexBundle\Enum\SearchIndex\IndexQueueOperation;
use Pimcore\Bundle\GenericDataIndexBundle\Message\DispatchQueueMessagesMessage;
use Pimcore\Bundle\GenericDataIndexBundle\Message\IndexUpdateQueueMessage;

Check notice on line 26 in src/Service/SearchIndex/IndexQueueService.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Unused import

Import 'Pimcore\\Bundle\\GenericDataIndexBundle\\Message\\IndexUpdateQueueMessage' is never used
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexService\AbstractIndexService;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexService\AssetIndexService;
Expand All @@ -35,9 +38,8 @@
use Pimcore\Model\DataObject\Service;
use Pimcore\Model\Element\ElementInterface;
use Pimcore\Model\Element\Tag;
use Symfony\Component\Console\Helper\ProgressBar;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\TransportNamesStamp;
use Symfony\Component\Serializer\Exception\ExceptionInterface;
use Symfony\Component\Serializer\Normalizer\DenormalizerInterface;
use UnhandledMatchError;
Expand Down Expand Up @@ -248,7 +250,7 @@ public function updateAssets(): IndexQueueService
{
$selectQuery = sprintf("SELECT id, '%s', '%s', '%s', '%s', 0 FROM %s",
ElementType::ASSET->value,
'asset',
IndexName::ASSET->value,
IndexQueueOperation::UPDATE->value,
$this->getCurrentQueueTableOperationTime(),
'assets'
Expand All @@ -266,7 +268,7 @@ public function updateByTag(Tag $tag): IndexQueueService
//assets
$selectQuery = sprintf("SELECT id, '%s', '%s', '%s', '%s', 0 FROM assets where id in (select cid from tags_assignment where ctype='asset' and tagid = %s)",

Check warning on line 269 in src/Service/SearchIndex/IndexQueueService.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Line is longer than allowed by code style

Line is longer than allowed by code style (\> 120 columns)

Check warning on line 269 in src/Service/SearchIndex/IndexQueueService.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Line is longer than allowed by code style

Line is longer than allowed by code style (\> 120 columns)
ElementType::ASSET->value,
'asset',
IndexName::ASSET->value,
IndexQueueOperation::UPDATE->value,
$this->getCurrentQueueTableOperationTime(),
$tag->getId()
Expand Down Expand Up @@ -479,7 +481,7 @@ protected function getElementIndexName(ElementInterface $element): string
{
return match (true) {

Check failure on line 482 in src/Service/SearchIndex/IndexQueueService.php

View workflow job for this annotation

GitHub Actions / Static Analysis with PHPStan (8.2, highest, 11.x-dev as 11.99.9, true)

Match expression does not handle remaining value: true

Check failure on line 482 in src/Service/SearchIndex/IndexQueueService.php

View workflow job for this annotation

GitHub Actions / Static Analysis with PHPStan (8.2, highest, false)

Match expression does not handle remaining value: true
$element instanceof Concrete => $element->getClassName(),
$element instanceof Asset => 'asset',
$element instanceof Asset => IndexName::ASSET->value,
};
}

Expand Down Expand Up @@ -509,20 +511,16 @@ public function countQueuedItems(): int
}
}

public function dispatchQueueMessages(OutputInterface $output): void
public function dispatchQueueMessages(bool $synchronously = false): void
{
$entries = $this->getUnhandledIndexQueueEntries(true);
$stamps = [];

$progressBar = new ProgressBar($output, count($entries));
$progressBar->start();

foreach(array_chunk($entries, 400) as $entriesBatch) {
$message = new IndexUpdateQueueMessage($entries);
$this->messageBus->dispatch($message);
$progressBar->advance(count($entriesBatch));
if ($synchronously) {
$stamps[] = new TransportNamesStamp(TransportName::SYNC->value);
}

$progressBar->finish();
$message = new DispatchQueueMessagesMessage();
$this->messageBus->dispatch($message, $stamps);
}

public function commit(): IndexQueueService
Expand Down
3 changes: 2 additions & 1 deletion src/Service/SearchIndex/IndexService/AssetIndexService.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use OpenSearch\Namespaces\IndicesNamespace;
use Pimcore\Bundle\GenericDataIndexBundle\Enum\SearchIndex\FieldCategory;
use Pimcore\Bundle\GenericDataIndexBundle\Enum\SearchIndex\FieldCategory\SystemField;
use Pimcore\Bundle\GenericDataIndexBundle\Enum\SearchIndex\IndexName;
use Pimcore\Model\Asset;
use Pimcore\Model\Element\ElementInterface;
use Pimcore\Model\Element\Service;
Expand All @@ -42,7 +43,7 @@ protected function getIndexName(ElementInterface $element): string

protected function getAssetIndexName(): string
{
return $this->searchIndexConfigService->getIndexName('asset');
return $this->searchIndexConfigService->getIndexName(IndexName::ASSET->value);
}

protected function getCurrentFullIndexName(): string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ public function extractMapping(ClassDefinition $classDefinition): array
* - if that fails, create new index and reindex on ES side
* - if that also fails, throws exception
*
* @return $this
*
* @throws Exception
*/
public function updateMapping(ClassDefinition $classDefinition, bool $forceCreateIndex = false): DataObjectIndexService

Check warning on line 118 in src/Service/SearchIndex/IndexService/DataObjectIndexService.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Line is longer than allowed by code style

Line is longer than allowed by code style (\> 120 columns)
Expand Down

0 comments on commit 4a86613

Please sign in to comment.