Skip to content

Commit

Permalink
fixes race condition for index switch
Browse files Browse the repository at this point in the history
minor performance improvement
Redis is mandatory now
  • Loading branch information
rliebi committed Sep 26, 2024
1 parent bab7f53 commit 31b153a
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 21 deletions.
15 changes: 11 additions & 4 deletions src/Command/Index.php
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private function processIndex(IndexInterface $indexConfig, Key $key): void
return;
}

$message = new ReleaseIndexLock($indexConfig->getName(), $key, swtichIndex: true);
$message = new ReleaseIndexLock($indexConfig->getName(), $key, switchIndex: true);

if ($this->async) {
$this->messageBus->dispatch($message);
Expand All @@ -261,9 +261,12 @@ private function populateIndex(IndexInterface $indexConfig): array
self::$isPopulating = true;
$messagesDispatched = 0;
$blueGreenKey = $this->lockService->lockSwitchBlueGreen($indexConfig);
$this->lockService->initializeProcessCount($indexConfig->getName());

try {
foreach ($indexConfig->getAllowedDocuments() as $document) {
$allowedDocuments = $indexConfig->getAllowedDocuments();

foreach ($allowedDocuments as $documentKey => $document) {
$documentInstance = $this->documentRepository->get($document);
$this->documentHelper->setTenantIfNeeded($documentInstance, $indexConfig);

Expand Down Expand Up @@ -310,7 +313,11 @@ private function populateIndex(IndexInterface $indexConfig): array
$lastItem = false;
$cooldown = 0;

if ($batchNumber === $numberOfBatches - 1 && $key === array_key_last($data ?? [])) {
if (
$batchNumber === $numberOfBatches - 1
&& $key === array_key_last($data ?? [])
&& $documentKey === array_key_last($allowedDocuments)
) {
$message = new ReleaseIndexLock($indexConfig->getName(), $blueGreenKey);
$this->messageBus->dispatch($message);
$lastItem = true;
Expand Down Expand Up @@ -341,6 +348,7 @@ private function populateIndex(IndexInterface $indexConfig): array
$callback
);
$messagesDispatched++;
$this->lockService->messageDispatched($indexConfig->getName());

if ($this->async) {
$envelope = new Envelope($message, []);
Expand All @@ -363,7 +371,6 @@ private function populateIndex(IndexInterface $indexConfig): array

$batchNumber++;
}

$progressBar->finish();
}
} catch (\Throwable $throwable) {
Expand Down
4 changes: 3 additions & 1 deletion src/Document/DataObjectNormalizerTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ protected function localizedAttributes(
}

$result = [];
$expandedFields = $this->expandFields($fields);

foreach ($this->getLocales() as $locale) {
if ($useFallbackValues) {
Expand All @@ -63,7 +64,7 @@ protected function localizedAttributes(

$result[$locale] = [];

foreach ($this->expandFields($fields) as $target => $source) {
foreach ($expandedFields as $target => $source) {
$result[$locale][$target] = is_callable($source)
? $source($element, $locale)
: $element->get($source, $locale);
Expand Down Expand Up @@ -198,6 +199,7 @@ protected function childrenRecursive(
}

return [DocumentInterface::ATTRIBUTE_CHILDREN_RECURSIVE => array_values(array_filter($carry))];

}

/**
Expand Down
12 changes: 7 additions & 5 deletions src/Messenger/Handler/CreateDocumentHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\Handler\Acknowledger;
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
Expand Down Expand Up @@ -58,7 +57,10 @@ protected function process(array $jobs): void

foreach ($jobs as [$message, $ack]) {
if ($message instanceof CreateDocument) {
$this->consoleOutput->writeln(sprintf('Processing message of %s %s', $message->objectType, $message->objectId), ConsoleOutputInterface::VERBOSITY_VERBOSE);
if ($this->consoleOutput->getVerbosity() > ConsoleOutputInterface::VERBOSITY_NORMAL) {
$count = $this->lockService->getCurrentCount($message->esIndex);
$this->consoleOutput->writeln(sprintf('Processing message of %s %s. ~ %s left.', $message->esIndex, $message->objectId, $count), ConsoleOutputInterface::VERBOSITY_VERBOSE);
}
$this->handleMessage($message);
$ack->ack();

Expand All @@ -76,7 +78,6 @@ protected function process(array $jobs): void
*/
private function handleMessage(CreateDocument $message): void
{

try {
if ($this->lockService->isExecutionLocked($message->esIndex)) {
return;
Expand All @@ -101,8 +102,7 @@ private function handleMessage(CreateDocument $message): void
} catch (\Throwable $throwable) {
if (!$this->configurationRepository->shouldSkipFailingDocuments()) {
$key = $this->lockService->lockExecution($message->esIndex);
$envelope = new Envelope(new ReleaseIndexLock($message->esIndex, $key), [new DelayStamp(5000)]);
$this->messageBus->dispatch($envelope);
$this->messageBus->dispatch(new ReleaseIndexLock($message->esIndex, $key), [new DelayStamp(5000)]);
}

if ($this->configurationRepository->shouldPopulateAsync()) {
Expand All @@ -116,6 +116,8 @@ private function handleMessage(CreateDocument $message): void
$this->lockService->createLockFromKey($key, ttl: $message->cooldown)->acquire();
}

$this->lockService->messageProcessed($message->esIndex);

\Pimcore::collectGarbage();
}
}
Expand Down
33 changes: 26 additions & 7 deletions src/Messenger/Handler/SwitchIndexHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Valantic\ElasticaBridgeBundle\Messenger\Message\ReleaseIndexLock;
use Valantic\ElasticaBridgeBundle\Repository\IndexRepository;
use Valantic\ElasticaBridgeBundle\Service\LockService;
Expand All @@ -22,6 +24,7 @@ public function __construct(
private readonly LockFactory $lockFactory,
private readonly LockService $lockService,
private readonly ConsoleOutputInterface $consoleOutput,
private readonly MessageBusInterface $messageBus,
) {}

/**
Expand All @@ -31,12 +34,26 @@ public function __construct(
*/
public function __invoke(ReleaseIndexLock $message): void
{
$releaseLock = true;

try {
if ($message->swtichIndex === false || $this->lockService->isExecutionLocked($message->indexName)) {
if ($message->switchIndex === false || $this->lockService->isExecutionLocked($message->indexName)) {
return;
}

// try to switch index. If not all messages are processed this will be rescheduled.
$key = $this->lockService->getKey($message->indexName, 'switch-blue-green');
$count = $this->lockService->getCurrentCount($message->indexName);
$this->consoleOutput->writeln(sprintf('waiting for lock release (%s) for %s (%s)', $count, $message->indexName, hash('sha256', (string) $key)), ConsoleOutputInterface::VERBOSITY_VERBOSE);

if (!$this->lockService->allMessagesProcessed($message->indexName)) {
$this->consoleOutput->writeln(sprintf('not all messages processed (~%s remaining), rescheduling', $count), ConsoleOutputInterface::VERBOSITY_VERBOSE);
$this->messageBus->dispatch($message->clone(), [new DelayStamp($count * 1000 * 2)]);
$releaseLock = false;

return;
}
$this->consoleOutput->writeln('waiting for lock release', ConsoleOutputInterface::VERBOSITY_VERBOSE);
$this->lockService->waitForFinish($message->indexName);

$indexConfig = $this->indexRepository->flattenedGet($message->indexName);
$oldIndex = $indexConfig->getBlueGreenActiveElasticaIndex();
$newIndex = $indexConfig->getBlueGreenInactiveElasticaIndex();
Expand All @@ -48,11 +65,13 @@ public function __invoke(ReleaseIndexLock $message): void
$this->consoleOutput->writeln('added alias to ' . $newIndex->getName(), ConsoleOutputInterface::VERBOSITY_NORMAL);
$oldIndex->flush();
} finally {
$this->consoleOutput->writeln('Releasing lock', ConsoleOutputInterface::VERBOSITY_VERBOSE);
$key = $message->key;
if ($releaseLock) {
$this->consoleOutput->writeln(sprintf('releasing lock %s (%s)', $message->key, hash('sha256', (string) $message->key)), ConsoleOutputInterface::VERBOSITY_VERBOSE);
$key = $message->key;

$lock = $this->lockFactory->createLockFromKey($key);
$lock->release();
$lock = $this->lockFactory->createLockFromKey($key);
$lock->release();
}

\Pimcore::collectGarbage();
}
Expand Down
7 changes: 6 additions & 1 deletion src/Messenger/Message/ReleaseIndexLock.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ class ReleaseIndexLock
public function __construct(
public readonly string $indexName,
public readonly Key $key,
public readonly bool $swtichIndex = false,
public readonly bool $switchIndex = false,
) {}

public function clone(): self
{
return new self($this->indexName, $this->key, $this->switchIndex);
}
}
55 changes: 52 additions & 3 deletions src/Service/LockService.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Valantic\ElasticaBridgeBundle\Service;

use Symfony\Component\DependencyInjection\Attribute\Autowire;
use Symfony\Component\Lock\Key;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\LockInterface;
Expand All @@ -18,6 +19,8 @@ class LockService
public function __construct(
private readonly LockFactory $lockFactory,
private readonly ConfigurationRepository $configurationRepository,
#[Autowire(service: 'cache.default_redis_provider')]
private readonly \Redis $redis,
) {}

public function getIndexingLock(IndexInterface $indexConfig): LockInterface
Expand Down Expand Up @@ -74,17 +77,63 @@ public function lockSwitchBlueGreen(IndexInterface $indexConfig): Key
return $key;
}

public function waitForFinish(string $indexName): void
public function allMessagesProcessed(string $indexName): bool
{
// the count is eventually consistent.
$currentCount = $this->getCurrentCount($indexName);

if (
Index::$isAsync === false
|| (Index::$isAsync === null && $this->configurationRepository->shouldPopulateAsync() === false)
) {
return;
return $currentCount === 0;
}

$key = $this->getKey($indexName, 'switch-blue-green');
$lock = $this->createLockFromKey($key, ttl: 0, autorelease: true);
$lock->acquire(true);

if ($currentCount > 0) {
return false;
}

if (!$lock->acquire()) {
return false;
}

$lock->release(); // release the lock instantly as we just checked
$cacheKey = self::LOCK_PREFIX . $indexName;
$this->redis->del($cacheKey); // clean up the cache key.

return true;
}

public function initializeProcessCount(string $name): void
{
$cacheKey = self::LOCK_PREFIX . $name;
$this->redis->set($cacheKey, 0);
}

public function messageProcessed(string $esIndex): void
{
$cacheKey = self::LOCK_PREFIX . $esIndex;
$this->redis->decr($cacheKey);
}

public function getCurrentCount(string $indexName): int
{
$cacheKey = self::LOCK_PREFIX . $indexName;
$count = $this->redis->get($cacheKey);

if ($count === false) {
$count = 0;
}

return (int) $count;
}

public function messageDispatched(string $getName): void
{
$cacheKey = self::LOCK_PREFIX . $getName;
$this->redis->incr($cacheKey);
}
}

0 comments on commit 31b153a

Please sign in to comment.