From c80e22e5e6dbbffe1a8cc87ff177726380fecec3 Mon Sep 17 00:00:00 2001 From: lexxorlov Date: Mon, 24 May 2021 08:01:20 +0300 Subject: [PATCH] Fix multy-jobs flow --- Queue/RedisQueue.php | 252 ++++++++++++++++++++-------------------- Service/JobResolver.php | 2 +- 2 files changed, 127 insertions(+), 127 deletions(-) diff --git a/Queue/RedisQueue.php b/Queue/RedisQueue.php index 588577c..bcffa08 100644 --- a/Queue/RedisQueue.php +++ b/Queue/RedisQueue.php @@ -100,6 +100,29 @@ public function size(?string $queue = null): int return (int)$this->getClient()->zcount($this->buildKey($queue), '-inf', '+inf'); } + /** + * Get redis client + * + * @return Client + */ + private function getClient(): Client + { + return $this->redis->getClient(); + } + + /** + * Build collection:queue:postfix key + * + * @param string|null $queue + * @param string|null $postfix + * + * @return string + */ + private function buildKey(?string $queue = 'default', ?string $postfix = null) + { + return "$this->collection:$queue" . ($postfix ? ":$postfix" : ''); + } + /** * Push a new job onto the queue. * @@ -132,6 +155,53 @@ public function pushRaw(string $payload, ?string $queue = null, array $options = return $this->pushToDatabase(0, $queue, $payload); } + /** + * Push job to database + * + * @param DateInterval|int $delay + * @param string|null $queue + * @param string $payload + * @param int $attempts + * + * @throws \Exception + */ + private function pushToDatabase($delay, ?string $queue, string $payload, int $attempts = 0) + { + $id = $this->getRandomId(); + + $pipeline = $this->getClient()->pipeline(['atomic' => true]) + ->hset( + $this->buildKey($queue, 'payload'), + $id, + $payload + ) + ->zadd($this->buildKey($queue), [ + $id => $this->getAvailableAt($delay), + ]); + + if ($attempts > 0) { + $pipeline->zadd($this->buildKey($queue, 'attempted'), [ + $id => $attempts, + ]); + } + + $pipeline->execute(); + } + + /** + * Get the "available at" UNIX timestamp. + * + * @param DateInterval|int $delay + * + * @return int + */ + private function getAvailableAt($delay = 0) + { + return $delay instanceof DateInterval + ? (new DateTime())->add($delay)->getTimestamp() + : $this->currentTime() + $delay; + } + /** * Push a new job onto the queue after a delay. * @@ -158,21 +228,71 @@ public function later($delay, string $job, array $data = [], ?string $queue = nu */ public function pop(?string $queue = null): ?JobContractInterface { - $id = $this->getClient()->zrangebyscore($this->buildKey($queue), 0, $this->currentTime(), ['LIMIT' => [0, 1]]); + $ids = $this->getClient()->zrangebyscore($this->buildKey($queue), 0, $this->currentTime(), ['LIMIT' => [0, $this->limit]]); - if (empty($id)) { + if (empty($ids)) { return null; } - if (is_array($id)) { - $id = array_shift($id); + foreach ($ids as $id) { + $reservedAt = $this->getClient()->zscore($this->buildKey($queue, 'reserved'), $id); + $isAvailable = null === $reservedAt; + $isReservedButExpired = false === ($reservedAt > ($this->currentTime() - $this->expire)); + // Take first available or reserved but expired job + if ($isAvailable || $isReservedButExpired) { + return $this->getJobById($queue, $id); + } } - $job = $this->getJobById($queue, $id); + return null; + } + + /** + * Get job by its id + * + * @param string $queue + * @param string $id + * + * @return JobContractInterface|null + */ + public function getJobById(string $queue, string $id): ?JobContractInterface + { + $job = $this->getClient()->hget($this->buildKey($queue, 'payload'), $id); - if ($job->reserved() && $job->reservedAt() > ($this->currentTime() - $this->expire)) { + if (!$job) { return null; + } else { + $reservedAt = $this->getClient()->zscore($this->buildKey($queue, 'reserved'), $id); + $attempts = $this->getClient()->zscore($this->buildKey($queue, 'attempted'), $id); + + return new JobContract( + $this->resolver, + $this, + $this->buildJob($id, $queue, $attempts ?? 0, json_decode($job, true), $reservedAt) + ); } + } + + /** + * Build job from database record + * + * @param string $id + * @param string $queue + * @param int $attempts + * @param array $payload + * @param int|null $reservedAt + * + * @return Job + */ + private function buildJob(string $id, string $queue, int $attempts, array $payload, ?int $reservedAt = null): Job + { + $job = new Job(); + $job->setId($id); + $job->setAttempts($attempts); + $job->setQueue($queue); + $job->setReserved((bool)$reservedAt); + $job->setReservedAt($reservedAt); + $job->setPayload($payload); return $job; } @@ -216,32 +336,6 @@ public function canRunJob(JobContractInterface $job): bool ) < $this->limit || $job->reserved(); } - /** - * Get job by its id - * - * @param string $queue - * @param string $id - * - * @return JobContractInterface|null - */ - public function getJobById(string $queue, string $id): ?JobContractInterface - { - $job = $this->getClient()->hget($this->buildKey($queue, 'payload'), $id); - - if (!$job) { - return null; - } else { - $reservedAt = $this->getClient()->zscore($this->buildKey($queue, 'reserved'), $id); - $attempts = $this->getClient()->zscore($this->buildKey($queue, 'attempted'), $id); - - return new JobContract( - $this->resolver, - $this, - $this->buildJob($id, $queue, $attempts ?? 0, json_decode($job, true), $reservedAt) - ); - } - } - /** * Mark the given job ID as reserved. * @@ -295,98 +389,4 @@ public function release(JobContractInterface $job, $delay) { return $this->pushToDatabase($delay, $job->getQueue(), $job->getRawBody(), $job->attempts()); } - - /** - * Build collection:queue:postfix key - * - * @param string|null $queue - * @param string|null $postfix - * - * @return string - */ - private function buildKey(?string $queue = 'default', ?string $postfix = null) - { - return "$this->collection:$queue" . ($postfix ? ":$postfix" : ''); - } - - /** - * Get the "available at" UNIX timestamp. - * - * @param DateInterval|int $delay - * - * @return int - */ - private function getAvailableAt($delay = 0) - { - return $delay instanceof DateInterval - ? (new DateTime())->add($delay)->getTimestamp() - : $this->currentTime() + $delay; - } - - /** - * Push job to database - * - * @param DateInterval|int $delay - * @param string|null $queue - * @param string $payload - * @param int $attempts - * - * @throws \Exception - */ - private function pushToDatabase($delay, ?string $queue, string $payload, int $attempts = 0) - { - $id = $this->getRandomId(); - - $pipeline = $this->getClient()->pipeline(['atomic' => true]) - ->hset( - $this->buildKey($queue, 'payload'), - $id, - $payload - ) - ->zadd($this->buildKey($queue), [ - $id => $this->getAvailableAt($delay), - ]); - - if ($attempts > 0) { - $pipeline->zadd($this->buildKey($queue, 'attempted'), [ - $id => $attempts, - ]); - } - - $pipeline->execute(); - } - - /** - * Build job from database record - * - * @param string $id - * @param string $queue - * @param int $attempts - * @param array $payload - * @param int|null $reservedAt - * - * @return Job - */ - private function buildJob(string $id, string $queue, int $attempts, array $payload, ?int $reservedAt = null): Job - { - $job = new Job(); - $job->setId($id); - $job->setAttempts($attempts); - $job->setQueue($queue); - $job->setReserved((bool)$reservedAt); - $job->setReservedAt($reservedAt); - $job->setPayload($payload); - - return $job; - } - - /** - * Get redis client - * - * @return Client - */ - private function getClient(): Client - { - return $this->redis->getClient(); - } } diff --git a/Service/JobResolver.php b/Service/JobResolver.php index ce1cbdb..b911d27 100644 --- a/Service/JobResolver.php +++ b/Service/JobResolver.php @@ -37,7 +37,7 @@ public function resolve(string $id): JobInterface } /** - * @inheritDoc + * {@inheritDoc} * * @param string $id * @param JobInterface $job