Skip to content

Commit

Permalink
retry inside switch message for some amount of time before rescheduling.
Browse files Browse the repository at this point in the history
  • Loading branch information
rliebi committed Sep 27, 2024
1 parent d1d9767 commit becd325
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions src/Messenger/Handler/SwitchIndexHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,24 @@ public function __invoke(ReleaseIndexLock $message): void
$this->consoleOutput->writeln(sprintf('waiting for lock release (%s) for %s (%s)', $count, $message->indexName, hash('sha256', (string) $key)), ConsoleOutputInterface::VERBOSITY_VERBOSE);

while (!$this->lockService->allMessagesProcessed($message->indexName) && $attempt < $maxAttempts) {
$this->consoleOutput->writeln(sprintf('not all messages processed (~%s remaining), attempt %d', $count, $attempt + 1), ConsoleOutputInterface::VERBOSITY_VERBOSE);
sleep(($count * $attempt) + 15);
$seconds = ($count * $attempt) + 5;
$this->consoleOutput->writeln(
sprintf(
'%s: not all messages processed (~%s remaining), attempt %d, trying again in %s seconds',
$message->indexName,
$count,
$attempt + 1,
$seconds,
),
ConsoleOutputInterface::VERBOSITY_VERBOSE,
);
sleep($seconds);
$attempt++;
}

if ($attempt >= $maxAttempts) {
$this->consoleOutput->writeln('Max attempts reached, rescheduling', ConsoleOutputInterface::VERBOSITY_VERBOSE);
$this->messageBus->dispatch($message->clone(), [new DelayStamp($count * 1000 * 2)]);
$this->messageBus->dispatch($message->clone());
$releaseLock = false;

return;
Expand Down

0 comments on commit becd325

Please sign in to comment.