Skip to content

Commit

Permalink
speedup recursive children, fix sync processing
Browse files Browse the repository at this point in the history
  • Loading branch information
rliebi committed Sep 27, 2024
1 parent ac6a958 commit d171437
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 21 deletions.
44 changes: 27 additions & 17 deletions src/Document/DataObjectNormalizerTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Valantic\ElasticaBridgeBundle\Document;

use Pimcore\Cache;
use Doctrine\DBAL\Connection;
use Pimcore\Localization\LocaleService;
use Pimcore\Model\DataObject\AbstractObject;
use Pimcore\Model\DataObject\Concrete;
Expand All @@ -20,13 +20,20 @@
trait DataObjectNormalizerTrait
{
protected LocaleService $localeService;
private Connection $connection;

#[Required]
public function setLocaleService(LocaleService $localeService): void
{
$this->localeService = $localeService;
}

#[Required]
public function setDatabaseConnection(Connection $connection): void
{
$this->connection = $connection;
}

/**
* Given $element, collect data from the localized fields $fields (optionally using fallback values)
* and return a normalized array.
Expand Down Expand Up @@ -190,26 +197,29 @@ protected function children(
protected function childrenRecursive(
Concrete $element,
array $objectTypes = [AbstractObject::OBJECT_TYPE_OBJECT, AbstractObject::OBJECT_TYPE_FOLDER],
array $carry = [],
): array {
$cache = Cache::load('elastica-bridge-children-recursive-' . $element->getId());

if (is_array($cache)) {
return [DocumentInterface::ATTRIBUTE_CHILDREN_RECURSIVE => $cache];
$placeholders = implode(',', array_fill(0, count($objectTypes), '?'));

$query = 'WITH RECURSIVE CategoryHierarchy AS (
SELECT id, parentId, published
FROM objects WHERE id = ? AND type in (' . $placeholders . ') AND published = 1
UNION ALL
SELECT c.id, c.parentId, c.published
FROM objects c
INNER JOIN CategoryHierarchy ch ON ch.id = c.parentId
)
SELECT DISTINCT id
FROM CategoryHierarchy where published = 1;';
$statement = $this->connection->prepare($query);
$statement->bindValue(1, $element->getId(), \PDO::PARAM_INT);

foreach ($objectTypes as $index => $type) {
$statement->bindValue($index + 2, $type, \PDO::PARAM_STR);
}

foreach ($element->getChildren($objectTypes) as $child) {
/** @var Concrete $child */
$carry[] = $child->getId();
$carry = array_values(array_filter($carry));
$carry = $this->childrenRecursive($child, $objectTypes, $carry)[DocumentInterface::ATTRIBUTE_CHILDREN_RECURSIVE];
}
$values = array_values(array_filter($carry));

Cache::save($values, 'elastica-bridge-children-recursive-' . $element->getId(), lifetime: 3600, force: true);

return [DocumentInterface::ATTRIBUTE_CHILDREN_RECURSIVE => array_values(array_filter($values))];
$result = $statement->executeQuery();

return [DocumentInterface::ATTRIBUTE_CHILDREN_RECURSIVE => array_map('intval', array_keys($result->fetchAllAssociativeIndexed()))];
}

/**
Expand Down
14 changes: 10 additions & 4 deletions src/Messenger/Handler/CreateDocumentHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public function __construct(
private readonly ConsoleOutputInterface $consoleOutput,
) {}

public function __invoke(CreateDocument $message, ?Acknowledger $ack = null): int
public function __invoke(CreateDocument $message, ?Acknowledger $ack = null): ?int
{
return $this->handle($message, $ack);
}
Expand All @@ -61,8 +61,14 @@ protected function process(array $jobs): void
$count = $this->lockService->getCurrentCount($message->esIndex);
$this->consoleOutput->writeln(sprintf('Processing message of %s %s. ~ %s left.', $message->esIndex, $message->objectId, $count), ConsoleOutputInterface::VERBOSITY_VERBOSE);
}
$this->handleMessage($message);
$ack->ack();

try {
$this->handleMessage($message);
$ack->ack();
} catch (\Throwable $e) {
$this->consoleOutput->writeln(sprintf('Error processing message of %s %s (%s)', $message->esIndex, $message->objectId, $e->getMessage()), ConsoleOutputInterface::VERBOSITY_NORMAL);
$ack->nack($e);
}

continue;
}
Expand Down Expand Up @@ -110,13 +116,13 @@ private function handleMessage(CreateDocument $message): void
}
}
}
$this->lockService->messageProcessed($message->esIndex);
} finally {
if ($message->lastItem && $message->cooldown > 0) {
$key = $this->lockService->getKey($message->esIndex, 'cooldown');
$this->lockService->createLockFromKey($key, ttl: $message->cooldown)->acquire();
}

$this->lockService->messageProcessed($message->esIndex);

\Pimcore::collectGarbage();
}
Expand Down

0 comments on commit d171437

Please sign in to comment.