Skip to content

Commit

Permalink
[PD-110] Change how entries are queried (#19)
Browse files Browse the repository at this point in the history
* fix: count entries in query and get entries only by batch

* Apply php-cs-fixer changes

* fix: default values

* remove ordering in the counter

* rename count method

---------

Co-authored-by: lukmzig <lukmzig@users.noreply.github.com>
  • Loading branch information
lukmzig and lukmzig authored Feb 1, 2024
1 parent 933614d commit 9753e06
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 28 deletions.
7 changes: 2 additions & 5 deletions src/MessageHandler/DispatchQueueMessagesHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ public function __construct(
public function __invoke(DispatchQueueMessagesMessage $message): void
{
try {
$entries = $this->indexQueueRepository->getUnhandledIndexQueueEntries();
$entriesCount = count($entries);

$entriesCount = $this->indexQueueRepository->countIndexQueueEntries();
if ($entriesCount === 0) {
return;
}
Expand All @@ -56,8 +54,7 @@ public function __invoke(DispatchQueueMessagesMessage $message): void

$this->queueMessageService->handleMessage(
$entriesCount,
$realMaxBatchSize,
$entries
$realMaxBatchSize
);
} catch (Exception $e) {
$this->logger->warning('Dispatching Queue Message failed: ' . $e);
Expand Down
13 changes: 13 additions & 0 deletions src/Repository/IndexQueueRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Doctrine\DBAL\Connection;
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\ORM\NonUniqueResultException;
use Doctrine\ORM\NoResultException;
use Doctrine\ORM\QueryBuilder;
use Exception;
use Pimcore\Bundle\GenericDataIndexBundle\Entity\IndexQueue;
Expand Down Expand Up @@ -52,6 +53,18 @@ public function dispatchableItemExists(): bool
}
}

/**
* @throws NonUniqueResultException
* @throws NoResultException
*/
public function countIndexQueueEntries(): int
{
return (int)$this->createQueryBuilder('q')
->select('count(q)')
->getQuery()
->getSingleScalarResult();
}

public function getUnhandledIndexQueueEntries(bool $dispatch = false, int $limit = 100000): array
{
try {
Expand Down
30 changes: 9 additions & 21 deletions src/Service/SearchIndex/IndexQueue/QueueMessageService.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

namespace Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue;

use Doctrine\DBAL\Exception;
use Pimcore\Bundle\GenericDataIndexBundle\Message\IndexUpdateQueueMessage;
use Pimcore\Bundle\GenericDataIndexBundle\Repository\IndexQueueRepository;
use Symfony\Component\Messenger\MessageBusInterface;
Expand All @@ -29,21 +28,19 @@ public function __construct(
) {
}

/**
* @throws Exception
*/
public function handleMessage(
int $entriesCount,
int $maxBatchSize,
array $entries
int $maxBatchSize
): void {
if ($entriesCount > $maxBatchSize) {
$chunks = array_chunk($entries, $maxBatchSize);
foreach($chunks as $chunk) {
$this->dispatchMessage($chunk, $maxBatchSize);
while(true) {
$entries = $this->indexQueueRepository->getUnhandledIndexQueueEntries(true, $maxBatchSize);
$amountOfEntries = count($entries);
if ($amountOfEntries > 0) {
$this->messageBus->dispatch(new IndexUpdateQueueMessage($entries));
}
if ($amountOfEntries < $maxBatchSize) {
break;
}
} else {
$this->dispatchMessage($entries, $maxBatchSize);
}
}

Expand All @@ -65,13 +62,4 @@ public function getMaxBatchSize(
default => $maxBatchSize,
};
}

/**
* @throws Exception
*/
private function dispatchMessage(array $items, int $limit): void
{
$this->messageBus->dispatch(new IndexUpdateQueueMessage($items));
$this->indexQueueRepository->dispatchItems($limit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ interface QueueMessageServiceInterface
{
public function handleMessage(
int $entriesCount,
int $maxBatchSize,
array $entries
int $maxBatchSize
): void;

public function getMaxBatchSize(
Expand Down

0 comments on commit 9753e06

Please sign in to comment.