Skip to content

Commit

Permalink
make batch size configurable for DispatchQueueMessagesHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
lukmzig committed Jan 31, 2024
1 parent 28e57c9 commit 0ceeb05
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 16 deletions.
3 changes: 3 additions & 0 deletions config/services/search/index.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ services:
Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\EnqueueServiceInterface:
class: Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\EnqueueService

Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\QueueMessageServiceInterface:
class: Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\QueueMessageService

Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\SearchIndexConfigServiceInterface:
class: Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\SearchIndexConfigService

Expand Down
18 changes: 18 additions & 0 deletions doc/05_Configuration/10_Index_Management.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Index Queue options

The indexing queue considers the following options:

- **worker_count** (default 1): number of messenger workers to process the queue
- **min_batch_size** (default 5): minimum number of items to process in one batch (when using multiple workers)
- **max_batch_size** (default 400): maximum number of items to process in one batch

Sample configuration:

```yaml
pimcore_generic_data_index:
index_service:
queue_settings:
worker_count: 1
min_batch_size: 5
max_batch_size: 400
```
22 changes: 21 additions & 1 deletion src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,27 @@ public function getConfigTreeBuilder(): TreeBuilder
->end()
->end()
->append($this->buildVariableNode('index_settings'))
->arrayNode('system_fields_settings')
->arrayNode('queue_settings')
->addDefaultsIfNotSet()
->children()
->scalarNode('worker_count')
->defaultValue(1)
->validate()
->ifTrue(function ($value) {
return $value < 1;
})
->thenInvalid('Worker count must be at least 1.')
->end()
->end()
->scalarNode('min_batch_size')
->defaultValue(5)
->end()
->scalarNode('max_batch_size')
->defaultValue(400)
->end()
->end()
->end()
->arrayNode('system_fields_settings')
->children()
->append($this->buildSystemFieldsSettingsNode('general'))
->append($this->buildSystemFieldsSettingsNode('data_object'))
Expand Down
4 changes: 4 additions & 0 deletions src/DependencyInjection/PimcoreGenericDataIndexExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Exception;
use InvalidArgumentException;
use Pimcore\Bundle\GenericDataIndexBundle\DependencyInjection\Factory\OpenSearchClientFactory;
use Pimcore\Bundle\GenericDataIndexBundle\MessageHandler\DispatchQueueMessagesHandler;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\SearchIndexConfigServiceInterface;
use Symfony\Component\Config\FileLocator;
use Symfony\Component\DependencyInjection\ContainerBuilder;
Expand Down Expand Up @@ -78,5 +79,8 @@ private function registerIndexServiceParams(ContainerBuilder $container, array $
$definition->setArgument('$username', $indexSettings['client_params']['username']);
$definition->setArgument('$password', $indexSettings['client_params']['password']);
$definition->setArgument('$sslVerification', $indexSettings['client_params']['ssl_verification']);

$definition = $container->getDefinition(DispatchQueueMessagesHandler::class);
$definition->setArgument('$queueSettings', $indexSettings['queue_settings']);
}
}
43 changes: 29 additions & 14 deletions src/MessageHandler/DispatchQueueMessagesHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,57 @@

namespace Pimcore\Bundle\GenericDataIndexBundle\MessageHandler;

use Exception;
use Pimcore\Bundle\GenericDataIndexBundle\Message\DispatchQueueMessagesMessage;
use Pimcore\Bundle\GenericDataIndexBundle\Message\IndexUpdateQueueMessage;
use Pimcore\Bundle\GenericDataIndexBundle\Repository\IndexQueueRepository;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\QueueMessagesDispatcher;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\QueueMessageServiceInterface;
use Pimcore\Bundle\GenericDataIndexBundle\Traits\LoggerAwareTrait;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;

/**
* @internal
*/
#[AsMessageHandler]
final class DispatchQueueMessagesHandler
{
use LoggerAwareTrait;

public function __construct(
private readonly IndexQueueRepository $indexQueueRepository,
private readonly QueueMessagesDispatcher $queueMessagesDispatcher,
private readonly MessageBusInterface $messageBus
private readonly QueueMessageServiceInterface $queueMessageService,
private readonly array $queueSettings,
) {
}

public function __invoke(DispatchQueueMessagesMessage $message): void
{
$batchSize = 400;
while (true) {
$entries = $this->indexQueueRepository->getUnhandledIndexQueueEntries(true, $batchSize);
$amountOfEntries = count($entries);
try {
$entries = $this->indexQueueRepository->getUnhandledIndexQueueEntries();
$entriesCount = count($entries);

if ($amountOfEntries > 0) {
$this->messageBus->dispatch(new IndexUpdateQueueMessage($entries));
if ($entriesCount === 0) {
return;
}

if ($amountOfEntries < $batchSize) {
break;
}
}
$realMaxBatchSize = $this->queueMessageService->getMaxBatchSize(
$entriesCount,
$this->queueSettings['worker_count'],
$this->queueSettings['min_batch_size'],
$this->queueSettings['max_batch_size']
);

$this->queueMessagesDispatcher->clearPendingState();
$this->queueMessageService->handleMessage(
$entriesCount,
$realMaxBatchSize,
$entries
);
} catch (Exception $e) {
$this->logger->warning('Dispatching Queue Message failed: ' . $e);
} finally {
$this->queueMessagesDispatcher->clearPendingState();
}
}

}
2 changes: 1 addition & 1 deletion src/Repository/IndexQueueRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public function enqueueBySelectQuery(string $selectQuery, array $params = []): v
/**
* @throws \Doctrine\DBAL\Exception
*/
private function dispatchItems(int $limit): int
public function dispatchItems(int $limit): int
{
$dispatchId = $this->timeService->getCurrentMillisecondTimestamp();

Expand Down
79 changes: 79 additions & 0 deletions src/Service/SearchIndex/IndexQueue/QueueMessageService.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?php
declare(strict_types=1);

/**
* Pimcore
*
* This source file is available under following license:
* - Pimcore Commercial License (PCL)
*
* @copyright Copyright (c) Pimcore GmbH (http://www.pimcore.org)
* @license http://www.pimcore.org/license PCL
*/

namespace Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue;

use Doctrine\DBAL\Exception;
use Pimcore\Bundle\GenericDataIndexBundle\Message\IndexUpdateQueueMessage;
use Pimcore\Bundle\GenericDataIndexBundle\Repository\IndexQueueRepository;
use Symfony\Component\Messenger\MessageBusInterface;

/**
* @internal
*/
final class QueueMessageService implements QueueMessageServiceInterface
{
public function __construct(
private readonly IndexQueueRepository $indexQueueRepository,
private readonly MessageBusInterface $messageBus
) {
}

/**
* @throws Exception
*/
public function handleMessage(
int $entriesCount,
int $maxBatchSize,
array $entries
): void
{
if ($entriesCount > $maxBatchSize) {
$chunks = array_chunk($entries, $maxBatchSize);
foreach($chunks as $chunk) {
$this->dispatchMessage($chunk, $maxBatchSize);
}
} else {
$this->dispatchMessage($entries, $maxBatchSize);
}
}

public function getMaxBatchSize(
int $entriesCount,
int $workerCount,
int $minBatchSize,
int $maxBatchSize
): int
{
if ($workerCount === 1) {
return $maxBatchSize;
}

$itemsPerWorker = (int)floor($entriesCount / $workerCount);

return match (true) {
$itemsPerWorker < $minBatchSize => $minBatchSize,
$itemsPerWorker < $maxBatchSize => $itemsPerWorker,
default => $maxBatchSize,
};
}

/**
* @throws Exception
*/
private function dispatchMessage(array $items, int $limit): void
{
$this->messageBus->dispatch(new IndexUpdateQueueMessage($items));
$this->indexQueueRepository->dispatchItems($limit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php
declare(strict_types=1);

/**
* Pimcore
*
* This source file is available under following license:
* - Pimcore Commercial License (PCL)
*
* @copyright Copyright (c) Pimcore GmbH (http://www.pimcore.org)
* @license http://www.pimcore.org/license PCL
*/

namespace Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue;

use Doctrine\DBAL\Exception;

Check notice on line 16 in src/Service/SearchIndex/IndexQueue/QueueMessageServiceInterface.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Unused import

Import 'Doctrine\\DBAL\\Exception' is never used

Check notice on line 16 in src/Service/SearchIndex/IndexQueue/QueueMessageServiceInterface.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Unused import

Import 'Doctrine\\DBAL\\Exception' is never used

Check notice on line 16 in src/Service/SearchIndex/IndexQueue/QueueMessageServiceInterface.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Unused import

Import 'Doctrine\\DBAL\\Exception' is never used
use Pimcore\Bundle\GenericDataIndexBundle\Exception\EnqueueAssetsException;

Check notice on line 17 in src/Service/SearchIndex/IndexQueue/QueueMessageServiceInterface.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Unused import

Import 'Pimcore\\Bundle\\GenericDataIndexBundle\\Exception\\EnqueueAssetsException' is never used

Check notice on line 17 in src/Service/SearchIndex/IndexQueue/QueueMessageServiceInterface.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Unused import

Import 'Pimcore\\Bundle\\GenericDataIndexBundle\\Exception\\EnqueueAssetsException' is never used

Check notice on line 17 in src/Service/SearchIndex/IndexQueue/QueueMessageServiceInterface.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Unused import

Import 'Pimcore\\Bundle\\GenericDataIndexBundle\\Exception\\EnqueueAssetsException' is never used
use Pimcore\Model\DataObject\ClassDefinition;

Check notice on line 18 in src/Service/SearchIndex/IndexQueue/QueueMessageServiceInterface.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Unused import

Import 'Pimcore\\Model\\DataObject\\ClassDefinition' is never used

Check notice on line 18 in src/Service/SearchIndex/IndexQueue/QueueMessageServiceInterface.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Unused import

Import 'Pimcore\\Model\\DataObject\\ClassDefinition' is never used

Check notice on line 18 in src/Service/SearchIndex/IndexQueue/QueueMessageServiceInterface.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Unused import

Import 'Pimcore\\Model\\DataObject\\ClassDefinition' is never used
use Pimcore\Model\Element\ElementInterface;

Check notice on line 19 in src/Service/SearchIndex/IndexQueue/QueueMessageServiceInterface.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Unused import

Import 'Pimcore\\Model\\Element\\ElementInterface' is never used

Check notice on line 19 in src/Service/SearchIndex/IndexQueue/QueueMessageServiceInterface.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Unused import

Import 'Pimcore\\Model\\Element\\ElementInterface' is never used

Check notice on line 19 in src/Service/SearchIndex/IndexQueue/QueueMessageServiceInterface.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Unused import

Import 'Pimcore\\Model\\Element\\ElementInterface' is never used
use Pimcore\Model\Element\Tag;

Check notice on line 20 in src/Service/SearchIndex/IndexQueue/QueueMessageServiceInterface.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Unused import

Import 'Pimcore\\Model\\Element\\Tag' is never used

Check notice on line 20 in src/Service/SearchIndex/IndexQueue/QueueMessageServiceInterface.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Unused import

Import 'Pimcore\\Model\\Element\\Tag' is never used

Check notice on line 20 in src/Service/SearchIndex/IndexQueue/QueueMessageServiceInterface.php

View workflow job for this annotation

GitHub Actions / Qodana for PHP

Unused import

Import 'Pimcore\\Model\\Element\\Tag' is never used

/**
* @internal
*/
interface QueueMessageServiceInterface
{
public function handleMessage(
int $entriesCount,
int $maxBatchSize,
array $entries
): void;

public function getMaxBatchSize(
int $entriesCount,
int $workerCount,
int $minBatchSize,
int $maxBatchSize
): int;
}
102 changes: 102 additions & 0 deletions tests/Unit/Service/SearchIndex/IndexQueue/QueueMessageServiceTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
<?php
declare(strict_types=1);

/**
* Pimcore
*
* This source file is available under following license:
* - Pimcore Commercial License (PCL)
*
* @copyright Copyright (c) Pimcore GmbH (http://www.pimcore.org)
* @license http://www.pimcore.org/license PCL
*/

namespace Pimcore\Bundle\GenericDataIndexBundle\Tests\Unit\Service\SearchIndex\IndexQueue;

use Codeception\Test\Unit;
use Doctrine\DBAL\Connection;
use Doctrine\ORM\EntityManagerInterface;
use Pimcore\Bundle\GenericDataIndexBundle\Repository\IndexQueueRepository;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\QueueMessageService;
use Pimcore\Bundle\GenericDataIndexBundle\Service\TimeServiceInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Serializer\Normalizer\DenormalizableInterface;
use Symfony\Component\Serializer\Normalizer\DenormalizerInterface;

/**
* @internal
*/
final class QueueMessageServiceTest extends Unit
{
private QueueMessageService $queueMessageService;

public function _before(): void
{
$this->queueMessageService = new QueueMessageService(
$this->getEmptyQueueRepository(),
$this->makeEmpty(MessageBusInterface::class)
);
}

public function testGetMaxBatchSizeWithOneWorker(): void
{
$this->assertSame(
40,
$this->queueMessageService->getMaxBatchSize(
100,
1,
10,
40
)
);
}

public function testGetMaxBatchSizeWithMultipleWorkers(): void
{
$this->assertSame(
50,
$this->queueMessageService->getMaxBatchSize(
250,
5,
5,
400
)
);
}

public function testGetMaxBatchSizeWithOneWorkerAndFewItems(): void
{
$this->assertSame(
50,
$this->queueMessageService->getMaxBatchSize(
20,
1,
10,
50
)
);
}

public function testGetMaxBatchSizeWithMultipleWorkersAndFewItems(): void
{
$this->assertSame(
10,
$this->queueMessageService->getMaxBatchSize(
20,
2,
5,
500
)
);
}

private function getEmptyQueueRepository(): IndexQueueRepository
{
return new IndexQueueRepository(
$this->makeEmpty(EntityManagerInterface::class),
$this->makeEmpty(TimeServiceInterface::class),
$this->makeEmpty(Connection::class),
$this->makeEmpty(DenormalizerInterface::class)
);
}
}

0 comments on commit 0ceeb05

Please sign in to comment.