Skip to content

Commit

Permalink
Merge pull request #7 from sfcod/redis
Browse files Browse the repository at this point in the history
Redis
  • Loading branch information
lexxorlov authored May 23, 2021
2 parents 959f68c + 0d9cc35 commit c6517bb
Show file tree
Hide file tree
Showing 38 changed files with 1,028 additions and 243 deletions.
1 change: 1 addition & 0 deletions .php_cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ $config = PhpCsFixer\Config::create()
'ordered_imports' => true,
'array_syntax' => ['syntax' => 'short'],
'self_accessor' => false,
'no_superfluous_phpdoc_tags' => false,
])
->setFinder($finder);
return $config;
22 changes: 0 additions & 22 deletions Base/MongoDriverInterface.php

This file was deleted.

23 changes: 23 additions & 0 deletions Base/RandomizeTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

namespace SfCod\QueueBundle\Base;

/**
* Trait RandomizeTrait
*
* @package SfCod\QueueBundle\Base
*/
trait RandomizeTrait
{
/**
* Get a random ID string.
*
* @return string
*
* @throws \Exception
*/
private function getRandomId(): string
{
return bin2hex(random_bytes(12));
}
}
18 changes: 15 additions & 3 deletions Command/RetryCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace SfCod\QueueBundle\Command;

use SfCod\QueueBundle\Base\MongoDriverInterface;
use SfCod\QueueBundle\Entity\Job;
use SfCod\QueueBundle\Failer\FailedJobProviderInterface;
use SfCod\QueueBundle\Service\JobQueue;
Expand Down Expand Up @@ -33,6 +32,9 @@ class RetryCommand extends Command

/**
* RetryCommand constructor.
*
* @param JobQueue $queue
* @param FailedJobProviderInterface $failer
*/
public function __construct(JobQueue $queue, FailedJobProviderInterface $failer)
{
Expand All @@ -55,6 +57,9 @@ protected function configure()
/**
* Execute command
*
* @param InputInterface $input
* @param OutputInterface $output
*
* @return int|void|null
*/
public function execute(InputInterface $input, OutputInterface $output)
Expand All @@ -70,9 +75,12 @@ public function execute(InputInterface $input, OutputInterface $output)
}
} else {
$job = $this->failer->find($input->getOption('id'));
$this->retryJob($job);

++$jobsCount;
if ($job) {
$this->retryJob($job);

++$jobsCount;
}
}

$io->success(sprintf("[%d] job(s) has been released.\n", $jobsCount));
Expand All @@ -82,6 +90,10 @@ public function execute(InputInterface $input, OutputInterface $output)

/**
* Retry job
*
* @param Job $job
*
* @return bool
*/
protected function retryJob(Job $job): bool
{
Expand Down
8 changes: 6 additions & 2 deletions Command/RunJobCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace SfCod\QueueBundle\Command;

use Psr\Log\LoggerInterface;
use SfCod\QueueBundle\Worker\Options;
use SfCod\QueueBundle\Worker\Worker;
use Symfony\Component\Console\Command\Command;
Expand All @@ -24,6 +23,7 @@ class RunJobCommand extends Command

/**
* RunJobCommand constructor.
*
* @param Worker $worker
*/
public function __construct(Worker $worker)
Expand Down Expand Up @@ -53,6 +53,9 @@ protected function configure()
/**
* Execute command
*
* @param InputInterface $input
* @param OutputInterface $output
*
* @return int|void|null
*/
public function execute(InputInterface $input, OutputInterface $output)
Expand All @@ -65,9 +68,10 @@ public function execute(InputInterface $input, OutputInterface $output)
$input->getOption('maxTries')
);
$connection = $input->getOption('connection');
$queue = $input->getOption('queue');
$jobId = $input->getArgument('id');

$this->worker->runJobById($connection, $jobId, $options);
$this->worker->runJobById($connection, $queue, $jobId, $options);

return 0;
}
Expand Down
20 changes: 10 additions & 10 deletions Connector/MongoConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
namespace SfCod\QueueBundle\Connector;

use SfCod\QueueBundle\Base\JobResolverInterface;
use SfCod\QueueBundle\Base\MongoDriverInterface;
use SfCod\QueueBundle\Queue\MongoQueue;
use SfCod\QueueBundle\Queue\QueueInterface;
use SfCod\QueueBundle\Service\MongoDriver;

/**
* Connector for laravel queue to mongodb
* Connector for queue to mongodb
*
* @author Orlov Aleksey <aaorlov88@gmail.com>
* @author Virchenko Maksim <muslim1992@gmail.com>
Expand All @@ -21,20 +21,20 @@ class MongoConnector implements ConnectorInterface
protected $jobResolver;

/**
* @var MongoDriverInterface
* @var MongoDriver
*/
protected $mongoDriver;
protected $mongo;

/**
* MongoConnector constructor.
*
* @param JobResolverInterface $jobResolver
* @param MongoDriverInterface $mongoDriver
* @param MongoDriver $mongo
*/
public function __construct(JobResolverInterface $jobResolver, MongoDriverInterface $mongoDriver)
public function __construct(JobResolverInterface $jobResolver, MongoDriver $mongo)
{
$this->jobResolver = $jobResolver;
$this->mongoDriver = $mongoDriver;
$this->mongo = $mongo;
}

/**
Expand All @@ -50,15 +50,15 @@ public function connect(array $config): QueueInterface
'limit' => 15,
], $config);

$mongoQueue = new MongoQueue(
$queue = new MongoQueue(
$this->jobResolver,
$this->mongoDriver,
$this->mongo,
$config['collection'],
$config['queue'],
$config['expire'],
$config['limit']
);

return $mongoQueue;
return $queue;
}
}
61 changes: 61 additions & 0 deletions Connector/RedisConnector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?php

namespace SfCod\QueueBundle\Connector;

use SfCod\QueueBundle\Base\JobResolverInterface;
use SfCod\QueueBundle\Queue\QueueInterface;
use SfCod\QueueBundle\Queue\RedisQueue;
use SfCod\QueueBundle\Service\RedisDriver;

/**
* Connector for queue to redis
*
* @author Virchenko Maksim <muslim1992@gmail.com>
*/
class RedisConnector implements ConnectorInterface
{
/**
* @var JobResolverInterface
*/
protected $jobResolver;

/**
* @var RedisDriver
*/
protected $redis;

/**
* RedisConnector constructor.
*
* @param JobResolverInterface $jobResolver
* @param RedisDriver $redis
*/
public function __construct(JobResolverInterface $jobResolver, RedisDriver $redis)
{
$this->jobResolver = $jobResolver;
$this->redis = $redis;
}

/**
* Establish a queue database.
*
* @param array $config
*
* @return QueueInterface
*/
public function connect(array $config): QueueInterface
{
$config = array_merge([
'limit' => 15,
], $config);

return new RedisQueue(
$this->jobResolver,
$this->redis,
$config['collection'],
$config['queue'],
$config['expire'],
$config['limit']
);
}
}
5 changes: 3 additions & 2 deletions DependencyInjection/Compiler/JobCompilerPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
namespace SfCod\QueueBundle\DependencyInjection\Compiler;

use SfCod\QueueBundle\Base\JobResolverInterface;
use SfCod\QueueBundle\Service\JobQueue;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Reference;

/**
* Class JobCompilerPass
*
* @author Virchenko Maksim <muslim1992@gmail.com>
*
* @package SfCod\QueueBundle\DependencyInjection\Compiler
*/
class JobCompilerPass implements CompilerPassInterface
Expand All @@ -33,4 +34,4 @@ public function process(ContainerBuilder $container)
$jobResolver->addMethodCall('addJob', [$id, new Reference($id)]);
}
}
}
}
34 changes: 20 additions & 14 deletions DependencyInjection/QueueConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,30 @@ private function addConnections(ArrayNodeDefinition $rootNode)
{
$rootNode
->children()
->arrayNode('namespaces')
->scalarPrototype()->end()
->arrayNode('namespaces')
->scalarPrototype()->end()
->end()
->end()
->children()
->arrayNode('drivers')
->useAttributeAsKey('name')
->scalarPrototype()->end()
->end()
->end()
->children()
->arrayNode('connections')
->useAttributeAsKey('name')
->arrayPrototype()
->children()
->scalarNode('driver')->end()
->scalarNode('collection')->end()
->scalarNode('connection')->end()
->scalarNode('queue')->end()
->scalarNode('expire')->end()
->scalarNode('limit')->end()
->end()
->end()
->end()
->useAttributeAsKey('name')
->arrayPrototype()
->children()
->scalarNode('driver')->end()
->scalarNode('collection')->end()
->scalarNode('connection')->end()
->scalarNode('queue')->end()
->scalarNode('expire')->end()
->scalarNode('limit')->end()
->end()
->end()
->end()
->end();
}
}
Loading

0 comments on commit c6517bb

Please sign in to comment.