Skip to content

Commit

Permalink
Fix multy-jobs flow
Browse files Browse the repository at this point in the history
  • Loading branch information
lexxorlov committed May 24, 2021
1 parent c6517bb commit c80e22e
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 127 deletions.
252 changes: 126 additions & 126 deletions Queue/RedisQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,29 @@ public function size(?string $queue = null): int
return (int)$this->getClient()->zcount($this->buildKey($queue), '-inf', '+inf');
}

/**
* Get redis client
*
* @return Client
*/
private function getClient(): Client
{
return $this->redis->getClient();
}

/**
* Build collection:queue:postfix key
*
* @param string|null $queue
* @param string|null $postfix
*
* @return string
*/
private function buildKey(?string $queue = 'default', ?string $postfix = null)
{
return "$this->collection:$queue" . ($postfix ? ":$postfix" : '');
}

/**
* Push a new job onto the queue.
*
Expand Down Expand Up @@ -132,6 +155,53 @@ public function pushRaw(string $payload, ?string $queue = null, array $options =
return $this->pushToDatabase(0, $queue, $payload);
}

/**
* Push job to database
*
* @param DateInterval|int $delay
* @param string|null $queue
* @param string $payload
* @param int $attempts
*
* @throws \Exception
*/
private function pushToDatabase($delay, ?string $queue, string $payload, int $attempts = 0)
{
$id = $this->getRandomId();

$pipeline = $this->getClient()->pipeline(['atomic' => true])
->hset(
$this->buildKey($queue, 'payload'),
$id,
$payload
)
->zadd($this->buildKey($queue), [
$id => $this->getAvailableAt($delay),
]);

if ($attempts > 0) {
$pipeline->zadd($this->buildKey($queue, 'attempted'), [
$id => $attempts,
]);
}

$pipeline->execute();
}

/**
* Get the "available at" UNIX timestamp.
*
* @param DateInterval|int $delay
*
* @return int
*/
private function getAvailableAt($delay = 0)
{
return $delay instanceof DateInterval
? (new DateTime())->add($delay)->getTimestamp()
: $this->currentTime() + $delay;
}

/**
* Push a new job onto the queue after a delay.
*
Expand All @@ -158,21 +228,71 @@ public function later($delay, string $job, array $data = [], ?string $queue = nu
*/
public function pop(?string $queue = null): ?JobContractInterface
{
$id = $this->getClient()->zrangebyscore($this->buildKey($queue), 0, $this->currentTime(), ['LIMIT' => [0, 1]]);
$ids = $this->getClient()->zrangebyscore($this->buildKey($queue), 0, $this->currentTime(), ['LIMIT' => [0, $this->limit]]);

if (empty($id)) {
if (empty($ids)) {
return null;
}

if (is_array($id)) {
$id = array_shift($id);
foreach ($ids as $id) {
$reservedAt = $this->getClient()->zscore($this->buildKey($queue, 'reserved'), $id);
$isAvailable = null === $reservedAt;
$isReservedButExpired = false === ($reservedAt > ($this->currentTime() - $this->expire));
// Take first available or reserved but expired job
if ($isAvailable || $isReservedButExpired) {
return $this->getJobById($queue, $id);
}
}

$job = $this->getJobById($queue, $id);
return null;
}

/**
* Get job by its id
*
* @param string $queue
* @param string $id
*
* @return JobContractInterface|null
*/
public function getJobById(string $queue, string $id): ?JobContractInterface
{
$job = $this->getClient()->hget($this->buildKey($queue, 'payload'), $id);

if ($job->reserved() && $job->reservedAt() > ($this->currentTime() - $this->expire)) {
if (!$job) {
return null;
} else {
$reservedAt = $this->getClient()->zscore($this->buildKey($queue, 'reserved'), $id);
$attempts = $this->getClient()->zscore($this->buildKey($queue, 'attempted'), $id);

return new JobContract(
$this->resolver,
$this,
$this->buildJob($id, $queue, $attempts ?? 0, json_decode($job, true), $reservedAt)
);
}
}

/**
* Build job from database record
*
* @param string $id
* @param string $queue
* @param int $attempts
* @param array $payload
* @param int|null $reservedAt
*
* @return Job
*/
private function buildJob(string $id, string $queue, int $attempts, array $payload, ?int $reservedAt = null): Job
{
$job = new Job();
$job->setId($id);
$job->setAttempts($attempts);
$job->setQueue($queue);
$job->setReserved((bool)$reservedAt);
$job->setReservedAt($reservedAt);
$job->setPayload($payload);

return $job;
}
Expand Down Expand Up @@ -216,32 +336,6 @@ public function canRunJob(JobContractInterface $job): bool
) < $this->limit || $job->reserved();
}

/**
* Get job by its id
*
* @param string $queue
* @param string $id
*
* @return JobContractInterface|null
*/
public function getJobById(string $queue, string $id): ?JobContractInterface
{
$job = $this->getClient()->hget($this->buildKey($queue, 'payload'), $id);

if (!$job) {
return null;
} else {
$reservedAt = $this->getClient()->zscore($this->buildKey($queue, 'reserved'), $id);
$attempts = $this->getClient()->zscore($this->buildKey($queue, 'attempted'), $id);

return new JobContract(
$this->resolver,
$this,
$this->buildJob($id, $queue, $attempts ?? 0, json_decode($job, true), $reservedAt)
);
}
}

/**
* Mark the given job ID as reserved.
*
Expand Down Expand Up @@ -295,98 +389,4 @@ public function release(JobContractInterface $job, $delay)
{
return $this->pushToDatabase($delay, $job->getQueue(), $job->getRawBody(), $job->attempts());
}

/**
* Build collection:queue:postfix key
*
* @param string|null $queue
* @param string|null $postfix
*
* @return string
*/
private function buildKey(?string $queue = 'default', ?string $postfix = null)
{
return "$this->collection:$queue" . ($postfix ? ":$postfix" : '');
}

/**
* Get the "available at" UNIX timestamp.
*
* @param DateInterval|int $delay
*
* @return int
*/
private function getAvailableAt($delay = 0)
{
return $delay instanceof DateInterval
? (new DateTime())->add($delay)->getTimestamp()
: $this->currentTime() + $delay;
}

/**
* Push job to database
*
* @param DateInterval|int $delay
* @param string|null $queue
* @param string $payload
* @param int $attempts
*
* @throws \Exception
*/
private function pushToDatabase($delay, ?string $queue, string $payload, int $attempts = 0)
{
$id = $this->getRandomId();

$pipeline = $this->getClient()->pipeline(['atomic' => true])
->hset(
$this->buildKey($queue, 'payload'),
$id,
$payload
)
->zadd($this->buildKey($queue), [
$id => $this->getAvailableAt($delay),
]);

if ($attempts > 0) {
$pipeline->zadd($this->buildKey($queue, 'attempted'), [
$id => $attempts,
]);
}

$pipeline->execute();
}

/**
* Build job from database record
*
* @param string $id
* @param string $queue
* @param int $attempts
* @param array $payload
* @param int|null $reservedAt
*
* @return Job
*/
private function buildJob(string $id, string $queue, int $attempts, array $payload, ?int $reservedAt = null): Job
{
$job = new Job();
$job->setId($id);
$job->setAttempts($attempts);
$job->setQueue($queue);
$job->setReserved((bool)$reservedAt);
$job->setReservedAt($reservedAt);
$job->setPayload($payload);

return $job;
}

/**
* Get redis client
*
* @return Client
*/
private function getClient(): Client
{
return $this->redis->getClient();
}
}
2 changes: 1 addition & 1 deletion Service/JobResolver.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public function resolve(string $id): JobInterface
}

/**
* @inheritDoc
* {@inheritDoc}
*
* @param string $id
* @param JobInterface $job
Expand Down

0 comments on commit c80e22e

Please sign in to comment.