diff --git a/.idea/encodings.xml b/.idea/encodings.xml
new file mode 100644
index 0000000..97626ba
--- /dev/null
+++ b/.idea/encodings.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/jobqueue.iml b/.idea/jobqueue.iml
new file mode 100644
index 0000000..b6308d1
--- /dev/null
+++ b/.idea/jobqueue.iml
@@ -0,0 +1,10 @@
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..7769073
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/php.xml b/.idea/php.xml
new file mode 100644
index 0000000..4f0611e
--- /dev/null
+++ b/.idea/php.xml
@@ -0,0 +1,4 @@
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..94a25f7
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/workspace.xml b/.idea/workspace.xml
new file mode 100644
index 0000000..c6e14ad
--- /dev/null
+++ b/.idea/workspace.xml
@@ -0,0 +1,519 @@
+
+
+
+
+
+
+
+
+
+
+
+
+ $PROJECT_DIR$/composer.json
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ JobFailedEvent
+
+
+ $PROJECT_DIR$
+
+
+
+
+
+
+
+
+
+
+
+ true
+ DEFINITION_ORDER
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 1525786104504
+
+
+ 1525786104504
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/Base/InteractWithTimeTrait.php b/Base/InteractWithTimeTrait.php
new file mode 100644
index 0000000..05aa06b
--- /dev/null
+++ b/Base/InteractWithTimeTrait.php
@@ -0,0 +1,75 @@
+
+ *
+ * @package SfCod\QueueBundle\Job
+ */
+trait InteractWithTimeTrait
+{
+ /**
+ * Get the number of seconds until the given DateTime.
+ *
+ * @param DateTimeInterface|DateInterval|int $delay
+ *
+ * @return int
+ */
+ protected function secondsUntil($delay): int
+ {
+ $delay = $this->parseDateInterval($delay);
+
+ return $delay instanceof DateTimeInterface
+ ? max(0, $delay->getTimestamp() - $this->currentTime())
+ : (int)$delay;
+ }
+
+ /**
+ * Get the "available at" UNIX timestamp.
+ *
+ * @param DateTimeInterface|DateInterval|int $delay
+ *
+ * @return int
+ */
+ protected function availableAt($delay = 0): int
+ {
+ $delay = $this->parseDateInterval($delay);
+
+ return $delay instanceof DateTimeInterface
+ ? $delay->getTimestamp()
+ : (new DateTime())->add(new DateInterval(sprintf('PT%dS', $delay)))->getTimestamp();
+ }
+
+ /**
+ * If the given value is an interval, convert it to a DateTime instance.
+ *
+ * @param \DateTimeInterface|\DateInterval|int $delay
+ *
+ * @return \DateTimeInterface|int
+ */
+ protected function parseDateInterval($delay)
+ {
+ if ($delay instanceof DateInterval) {
+ $delay = (new DateTime())->add($delay);
+ }
+
+ return $delay;
+ }
+
+ /**
+ * Get the current system time as a UNIX timestamp.
+ *
+ * @return int
+ */
+ protected function currentTime(): int
+ {
+ return time();
+ }
+}
diff --git a/Base/Job.php b/Base/Job.php
deleted file mode 100644
index 9501f58..0000000
--- a/Base/Job.php
+++ /dev/null
@@ -1,49 +0,0 @@
-
- * @author Virchenko Maksim
- *
- * @package SfCod\QueueBundle\Base
- */
-abstract class Job extends \Illuminate\Queue\Jobs\Job
-{
- /**
- * The IoC container instance.
- *
- * @var ContainerInterface
- */
- protected $container;
-
- /**
- * Resolve the given class.
- *
- * @param string $class
- *
- * @return mixed
- */
- protected function resolve($class)
- {
- return $this->container->get($class);
- }
-
- /**
- * Get job attempts
- *
- * @return int
- */
- abstract public function attempts(): int;
-
- /**
- * Get is job reserved
- *
- * @return bool
- */
- abstract public function reserved(): bool;
-}
diff --git a/Base/JobAbstract.php b/Base/JobAbstract.php
deleted file mode 100644
index 72d83a3..0000000
--- a/Base/JobAbstract.php
+++ /dev/null
@@ -1,30 +0,0 @@
-
- */
-abstract class JobAbstract implements JobInterface
-{
- /**
- * Run job with restarting connection
- *
- * @param Job $job
- * @param array $data
- *
- * @throws \Illuminate\Container\EntryNotFoundException
- */
- public function fire(Job $job, array $data)
- {
- /** @var Connection $connection */
- $connection = $job->getContainer()->get('doctrine')->getConnection();
- $connection->close();
- $connection->connect();
- }
-}
diff --git a/Base/JobInterface.php b/Base/JobInterface.php
index b4a946c..ba30588 100644
--- a/Base/JobInterface.php
+++ b/Base/JobInterface.php
@@ -2,7 +2,7 @@
namespace SfCod\QueueBundle\Base;
-use Illuminate\Queue\Jobs\Job;
+use SfCod\QueueBundle\Job\JobContract;
/**
* Base interface for handlers
@@ -14,8 +14,8 @@ interface JobInterface
/**
* Run command from queue
*
- * @param Job $job
+ * @param JobContract $job
* @param array $data
*/
- public function fire(Job $job, array $data);
+ public function fire(JobContract $job, array $data);
}
diff --git a/Base/JobQueueInterface.php b/Base/JobQueueInterface.php
index 180479e..1ca9187 100644
--- a/Base/JobQueueInterface.php
+++ b/Base/JobQueueInterface.php
@@ -2,9 +2,6 @@
namespace SfCod\QueueBundle\Base;
-use Illuminate\Queue\Capsule\Manager;
-use Illuminate\Queue\QueueManager;
-
/**
* Job queue interface
*
@@ -12,13 +9,6 @@
*/
interface JobQueueInterface
{
- /**
- * Get queue manager instance
- *
- * @return QueueManager
- */
- public function getQueueManager(): QueueManager;
-
/**
* Push new job to queue
*
diff --git a/Base/JobResolverInterface.php b/Base/JobResolverInterface.php
new file mode 100644
index 0000000..3c04c56
--- /dev/null
+++ b/Base/JobResolverInterface.php
@@ -0,0 +1,20 @@
+queue = $queue;
$this->failer = $failer;
@@ -87,7 +87,7 @@ public function execute(InputInterface $input, OutputInterface $output)
/**
* Retry job
*
- * @param Job $job
+ * @param JobContract $job
*
* @return bool
*/
diff --git a/Command/RunJobCommand.php b/Command/RunJobCommand.php
index ea5cc56..b5566d7 100644
--- a/Command/RunJobCommand.php
+++ b/Command/RunJobCommand.php
@@ -3,8 +3,8 @@
namespace SfCod\QueueBundle\Command;
use Psr\Log\LoggerInterface;
-use SfCod\QueueBundle\Options;
-use SfCod\QueueBundle\Worker;
+use SfCod\QueueBundle\Worker\Options;
+use SfCod\QueueBundle\Worker\Worker;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
@@ -44,7 +44,7 @@ protected function configure()
->setDescription('Runs single job by id.')
->addArgument('id', InputArgument::REQUIRED, 'The id of the job.')
->addOption('connection', null, InputArgument::OPTIONAL, 'The name of the connection.', 'default')
- ->addOption('queue', null, InputArgument::OPTIONAL, 'The name of the queue.', null)
+ ->addOption('queue', null, InputArgument::OPTIONAL, 'The name of the queue.', 'default')
->addOption('delay', null, InputArgument::OPTIONAL, 'Delay before getting jobs.', 0)
->addOption('memory', null, InputArgument::OPTIONAL, 'Maximum memory usage limit.', 128)
->addOption('sleep', null, InputArgument::OPTIONAL, 'Sleep time before getting new job.', 3)
diff --git a/Command/WorkCommand.php b/Command/WorkCommand.php
index 2e8d00e..e6831d7 100644
--- a/Command/WorkCommand.php
+++ b/Command/WorkCommand.php
@@ -2,12 +2,13 @@
namespace SfCod\QueueBundle\Command;
-use SfCod\QueueBundle\Options;
-use SfCod\QueueBundle\Worker;
+use SfCod\QueueBundle\Worker\Options;
+use SfCod\QueueBundle\Worker\Worker;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
+use Symfony\Component\Console\Style\SymfonyStyle;
/**
* Class WorkCommand
@@ -48,7 +49,7 @@ protected function configure()
->addOption('maxTries', null, InputArgument::OPTIONAL, 'Max tries to run job.', 1)
->addOption('timeout', null, InputArgument::OPTIONAL, 'Daemon timeout.', 60)
->addOption('connection', null, InputArgument::OPTIONAL, 'The name of the connection.', 'default')
- ->addOption('queue', null, InputArgument::OPTIONAL, 'The name of the queue.', null)
+ ->addOption('queue', null, InputArgument::OPTIONAL, 'The name of the queue.', 'default')
->setDescription('Run worker.');
}
@@ -62,6 +63,8 @@ protected function configure()
*/
public function execute(InputInterface $input, OutputInterface $output)
{
+ $io = new SymfonyStyle($input, $output);
+
$workerOptions = new Options(
$input->getOption('delay'),
$input->getOption('memory'),
@@ -72,6 +75,8 @@ public function execute(InputInterface $input, OutputInterface $output)
$connection = $input->getOption('connection');
$queue = $input->getOption('queue');
+ $io->success(sprintf('Worker daemon has started.'));
+
$this->worker->daemon($connection, $queue, $workerOptions);
}
}
diff --git a/Connector/ConnectorInterface.php b/Connector/ConnectorInterface.php
new file mode 100644
index 0000000..883a043
--- /dev/null
+++ b/Connector/ConnectorInterface.php
@@ -0,0 +1,22 @@
+container = $container;
+ $this->jobResolver = $jobResolver;
+ $this->mongoDriver = $mongoDriver;
}
/**
@@ -35,17 +42,22 @@ public function __construct(ContainerInterface $container)
*
* @param array $config
*
- * @return MongoQueue
+ * @return QueueInterface
*/
- public function connect(array $config)
+ public function connect(array $config): QueueInterface
{
$config = array_merge([
'limit' => 15,
- 'connection' => MongoDriverInterface::class,
], $config);
- $mongoQueue = new MongoQueue($this->container->get($config['connection']), $config['collection'], $config['queue'], $config['expire'], $config['limit']);
- $mongoQueue->putContainer($this->container);
+ $mongoQueue = new MongoQueue(
+ $this->jobResolver,
+ $this->mongoDriver,
+ $config['collection'],
+ $config['queue'],
+ $config['expire'],
+ $config['limit']
+ );
return $mongoQueue;
}
diff --git a/DependencyInjection/QueueExtension.php b/DependencyInjection/QueueExtension.php
index 5b20ea2..bf79d89 100644
--- a/DependencyInjection/QueueExtension.php
+++ b/DependencyInjection/QueueExtension.php
@@ -3,17 +3,23 @@
namespace SfCod\QueueBundle\DependencyInjection;
use Psr\Log\LoggerInterface;
+use SfCod\QueueBundle\Base\JobResolverInterface;
+use SfCod\QueueBundle\Base\MongoDriverInterface;
use SfCod\QueueBundle\Command\RetryCommand;
use SfCod\QueueBundle\Command\RunJobCommand;
use SfCod\QueueBundle\Command\WorkCommand;
+use SfCod\QueueBundle\Connector\ConnectorInterface;
+use SfCod\QueueBundle\Connector\MongoConnector;
+use SfCod\QueueBundle\Failer\FailedJobProviderInterface;
use SfCod\QueueBundle\Failer\MongoFailedJobProvider;
use SfCod\QueueBundle\Handler\ExceptionHandler;
use SfCod\QueueBundle\Handler\ExceptionHandlerInterface;
-use SfCod\QueueBundle\JobProcess;
+use SfCod\QueueBundle\Service\JobProcess;
use SfCod\QueueBundle\Service\JobQueue;
+use SfCod\QueueBundle\Service\JobResolver;
use SfCod\QueueBundle\Service\MongoDriver;
-use SfCod\QueueBundle\Service\MongoDriverInterface;
-use SfCod\QueueBundle\Worker;
+use SfCod\QueueBundle\Service\QueueManager;
+use SfCod\QueueBundle\Worker\Worker;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\DependencyInjection\Definition;
@@ -55,11 +61,13 @@ public function load(array $config, ContainerBuilder $container)
->setPublic(true);
$container->setDefinition($job, $definition);
}
+
$this->createDriver($config, $container);
$this->createJobQueue($config, $container);
$this->createWorker($config, $container);
$this->createJobProcess($config, $container);
$this->createCommands($config, $container);
+ $this->createManager($config, $container);
}
/**
@@ -111,7 +119,7 @@ private function createCommands(array $config, ContainerBuilder $container)
$retry = new Definition(RetryCommand::class);
$retry->setArguments([
new Reference(JobQueue::class),
- new Reference(MongoFailedJobProvider::class),
+ new Reference(FailedJobProviderInterface::class),
]);
$retry->addTag('console.command');
@@ -128,6 +136,47 @@ private function createCommands(array $config, ContainerBuilder $container)
]);
}
+ /**
+ * Create queue manager
+ *
+ * @param array $config
+ * @param ContainerBuilder $container
+ */
+ private function createManager(array $config, ContainerBuilder $container)
+ {
+ $resolver = new Definition(JobResolverInterface::class);
+ $resolver->setClass(JobResolver::class);
+ $resolver->setArguments([
+ new Reference(ContainerInterface::class),
+ ]);
+
+ $connector = new Definition(ConnectorInterface::class);
+ $connector->setClass(MongoConnector::class);
+ $connector->setArguments([
+ new Reference(JobResolverInterface::class),
+ new Reference(MongoDriverInterface::class),
+ ]);
+
+ $manager = new Definition(QueueManager::class);
+ $manager->addMethodCall('addConnector', [
+ 'mongo-thread',
+ new Reference(ConnectorInterface::class),
+ ]);
+
+ foreach ($config['connections'] as $name => $params) {
+ $manager->addMethodCall('addConnection', [
+ $params,
+ $name,
+ ]);
+ }
+
+ $container->addDefinitions([
+ JobResolverInterface::class => $resolver,
+ ConnectorInterface::class => $connector,
+ QueueManager::class => $manager,
+ ]);
+ }
+
/**
* Create driver
*
@@ -136,8 +185,8 @@ private function createCommands(array $config, ContainerBuilder $container)
*/
private function createDriver(array $config, ContainerBuilder $container)
{
- $mongo = new Definition(MongoDriver::class);
- $mongo->setPublic(true);
+ $mongo = new Definition(MongoDriverInterface::class);
+ $mongo->setClass(MongoDriver::class);
$mongo->addMethodCall('setCredentials', [
getenv('MONGODB_URL'),
]);
@@ -159,8 +208,7 @@ private function createJobQueue(array $config, ContainerBuilder $container)
$jobQueue = new Definition(JobQueue::class);
$jobQueue->setPublic(true);
$jobQueue->setArguments([
- new Reference(ContainerInterface::class),
- $config['connections'],
+ new Reference(QueueManager::class),
]);
$container->setDefinition(JobQueue::class, $jobQueue);
@@ -177,15 +225,16 @@ private function createWorker(array $config, ContainerBuilder $container)
$worker = new Definition(Worker::class);
$worker
->setArguments([
- new Reference(JobQueue::class),
+ new Reference(QueueManager::class),
new Reference(JobProcess::class),
- new Reference(MongoFailedJobProvider::class),
+ new Reference(FailedJobProviderInterface::class),
new Reference(ExceptionHandlerInterface::class),
new Reference(EventDispatcherInterface::class),
]);
- $failedProvider = new Definition(MongoFailedJobProvider::class);
+ $failedProvider = new Definition(FailedJobProviderInterface::class);
$failedProvider
+ ->setClass(MongoFailedJobProvider::class)
->setArguments([
new Reference(MongoDriverInterface::class),
'queue_jobs_failed',
@@ -200,7 +249,7 @@ private function createWorker(array $config, ContainerBuilder $container)
$container->addDefinitions([
Worker::class => $worker,
- MongoFailedJobProvider::class => $failedProvider,
+ FailedJobProviderInterface::class => $failedProvider,
ExceptionHandlerInterface::class => $exceptionHandler,
]);
}
@@ -214,7 +263,6 @@ private function createWorker(array $config, ContainerBuilder $container)
private function createJobProcess(array $config, ContainerBuilder $container)
{
$jobProcess = new Definition(JobProcess::class);
- $jobProcess->setPublic(true);
$jobProcess->setArguments([
'console',
sprintf('%s/bin', $container->getParameter('kernel.project_dir')),
diff --git a/Event/JobExceptionOccurredEvent.php b/Event/JobExceptionOccurredEvent.php
index c42fae7..949730c 100644
--- a/Event/JobExceptionOccurredEvent.php
+++ b/Event/JobExceptionOccurredEvent.php
@@ -2,6 +2,8 @@
namespace SfCod\QueueBundle\Event;
+use Exception;
+use SfCod\QueueBundle\Job\JobContractInterface;
use Symfony\Component\EventDispatcher\Event;
/**
@@ -24,7 +26,7 @@ class JobExceptionOccurredEvent extends Event
/**
* The job instance.
*
- * @var \Illuminate\Contracts\Queue\Job
+ * @var JobContractInterface
*/
public $job;
@@ -39,11 +41,11 @@ class JobExceptionOccurredEvent extends Event
* Create a new event instance.
*
* @param string $connectionName
- * @param \Illuminate\Contracts\Queue\Job $job
- * @param \Exception $exception
+ * @param JobContractInterface $job
+ * @param Exception $exception
* @param array $config
*/
- public function __construct($connectionName, $job, $exception, array $config = [])
+ public function __construct(string $connectionName, JobContractInterface $job, Exception $exception, array $config = [])
{
$this->job = $job;
$this->exception = $exception;
diff --git a/Event/JobFailedEvent.php b/Event/JobFailedEvent.php
index 60aa753..caf0f97 100644
--- a/Event/JobFailedEvent.php
+++ b/Event/JobFailedEvent.php
@@ -2,6 +2,8 @@
namespace SfCod\QueueBundle\Event;
+use Exception;
+use SfCod\QueueBundle\Job\JobContractInterface;
use Symfony\Component\EventDispatcher\Event;
/**
@@ -24,7 +26,7 @@ class JobFailedEvent extends Event
/**
* The job instance.
*
- * @var \Illuminate\Contracts\Queue\Job
+ * @var JobContractInterface
*/
public $job;
@@ -39,11 +41,11 @@ class JobFailedEvent extends Event
* Create a new event instance.
*
* @param string $connectionName
- * @param \Illuminate\Contracts\Queue\Job $job
- * @param \Exception $exception
+ * @param JobContractInterface $job
+ * @param Exception $exception
* @param array $config
*/
- public function __construct($connectionName, $job, $exception, array $config = [])
+ public function __construct(string $connectionName, JobContractInterface $job, Exception $exception, array $config = [])
{
$this->job = $job;
$this->exception = $exception;
diff --git a/Event/JobProcessedEvent.php b/Event/JobProcessedEvent.php
index 6427c66..ae98fce 100644
--- a/Event/JobProcessedEvent.php
+++ b/Event/JobProcessedEvent.php
@@ -2,6 +2,7 @@
namespace SfCod\QueueBundle\Event;
+use SfCod\QueueBundle\Job\JobContractInterface;
use Symfony\Component\EventDispatcher\Event;
/**
@@ -24,7 +25,7 @@ class JobProcessedEvent extends Event
/**
* The job instance.
*
- * @var \Illuminate\Contracts\Queue\Job
+ * @var JobContractInterface
*/
public $job;
@@ -32,10 +33,10 @@ class JobProcessedEvent extends Event
* Create a new event instance.
*
* @param string $connectionName
- * @param \Illuminate\Contracts\Queue\Job $job
+ * @param JobContractInterface $job
* @param array $config
*/
- public function __construct($connectionName, $job, array $config = [])
+ public function __construct(string $connectionName, JobContractInterface $job, array $config = [])
{
$this->job = $job;
$this->connectionName = $connectionName;
diff --git a/Event/JobProcessingEvent.php b/Event/JobProcessingEvent.php
index e34f6d8..88515de 100644
--- a/Event/JobProcessingEvent.php
+++ b/Event/JobProcessingEvent.php
@@ -2,6 +2,7 @@
namespace SfCod\QueueBundle\Event;
+use SfCod\QueueBundle\Job\JobContractInterface;
use Symfony\Component\EventDispatcher\Event;
/**
@@ -24,7 +25,7 @@ class JobProcessingEvent extends Event
/**
* The job instance.
*
- * @var \Illuminate\Contracts\Queue\Job
+ * @var JobContractInterface
*/
public $job;
@@ -32,10 +33,10 @@ class JobProcessingEvent extends Event
* Create a new event instance.
*
* @param string $connectionName
- * @param \Illuminate\Contracts\Queue\Job $job
+ * @param JobContractInterface $job
* @param array $config
*/
- public function __construct($connectionName, $job, array $config = [])
+ public function __construct(string $connectionName, JobContractInterface $job, array $config = [])
{
$this->job = $job;
$this->connectionName = $connectionName;
diff --git a/Base/FatalThrowableError.php b/Exception/FatalThrowableException.php
similarity index 76%
rename from Base/FatalThrowableError.php
rename to Exception/FatalThrowableException.php
index 59d43b3..129c158 100644
--- a/Base/FatalThrowableError.php
+++ b/Exception/FatalThrowableException.php
@@ -1,6 +1,6 @@
*/
-class FatalThrowableError extends Exception
+class FatalThrowableException extends Exception
{
/**
- * FatalThrowableError constructor.
+ * FatalThrowableException constructor.
*
* @param Exception $e
* @param int $code
diff --git a/Exception/InvalidPayloadException.php b/Exception/InvalidPayloadException.php
new file mode 100644
index 0000000..983fc81
--- /dev/null
+++ b/Exception/InvalidPayloadException.php
@@ -0,0 +1,27 @@
+
+ *
+ * @package SfCod\QueueBundle\Exception
+ */
+class InvalidPayloadException extends InvalidArgumentException
+{
+ /**
+ * Create a new exception instance.
+ *
+ * @param string|null $message
+ *
+ * @return void
+ */
+ public function __construct($message = null)
+ {
+ parent::__construct($message ?: json_last_error());
+ }
+}
diff --git a/Exception/MaxAttemptsExceededException.php b/Exception/MaxAttemptsExceededException.php
new file mode 100644
index 0000000..69b56d4
--- /dev/null
+++ b/Exception/MaxAttemptsExceededException.php
@@ -0,0 +1,16 @@
+
+ *
+ * @package SfCod\JobQueue\Exception
+ */
+class MaxAttemptsExceededException extends RuntimeException
+{
+}
diff --git a/Failer/FailedJobProviderInterface.php b/Failer/FailedJobProviderInterface.php
new file mode 100644
index 0000000..2d80786
--- /dev/null
+++ b/Failer/FailedJobProviderInterface.php
@@ -0,0 +1,57 @@
+getCollection()->insertOne([
'connection' => $connection,
'queue' => $queue,
'payload' => $payload,
'exception' => $exception->getMessage(),
- 'failed_at' => Carbon::now(),
+ 'failed_at' => time(),
]);
}
@@ -66,7 +65,7 @@ public function log($connection, $queue, $payload, $exception)
*
* @return array
*/
- public function all()
+ public function all(): array
{
$result = [];
$data = $this->getCollection()->find([], [
@@ -83,7 +82,7 @@ public function all()
/**
* Get a single failed job.
*
- * @param mixed $id
+ * @param string $id
*
* @return array
*/
@@ -95,11 +94,11 @@ public function find($id)
/**
* Delete a single failed job from storage.
*
- * @param mixed $id
+ * @param string $id
*
* @return bool
*/
- public function forget($id)
+ public function forget($id): bool
{
return (bool)$this->getCollection()->deleteOne(['_id' => new \MongoDB\BSON\ObjectID($id)])->getDeletedCount();
}
diff --git a/Handler/ExceptionHandlerInterface.php b/Handler/ExceptionHandlerInterface.php
index 758b74f..f0a71f7 100644
--- a/Handler/ExceptionHandlerInterface.php
+++ b/Handler/ExceptionHandlerInterface.php
@@ -14,7 +14,7 @@ interface ExceptionHandlerInterface
/**
* Report or log an exception.
*
- * @param \Exception $e
+ * @param Exception $e
*
* @return void
*/
@@ -23,8 +23,8 @@ public function report(Exception $e);
/**
* Render an exception into an HTTP response.
*
- * @param \Illuminate\Http\Request $request
- * @param \Exception $e
+ * @param $request
+ * @param Exception $e
*
* @return \Symfony\Component\HttpFoundation\Response
*/
@@ -34,7 +34,7 @@ public function render($request, Exception $e);
* Render an exception to the console.
*
* @param \Symfony\Component\Console\Output\OutputInterface $output
- * @param \Exception $e
+ * @param Exception $e
*
* @return void
*/
diff --git a/Job/JobContract.php b/Job/JobContract.php
new file mode 100644
index 0000000..8a2b230
--- /dev/null
+++ b/Job/JobContract.php
@@ -0,0 +1,265 @@
+
+ * @author Virchenko Maksim
+ *
+ * @package SfCod\QueueBundle\Base
+ */
+abstract class JobContract implements JobContractInterface
+{
+ use InteractWithTimeTrait;
+
+ /**
+ * The job handler instance.
+ *
+ * @var JobInterface
+ */
+ protected $instance;
+
+ /**
+ * Indicates if the job has been deleted.
+ *
+ * @var bool
+ */
+ protected $deleted = false;
+
+ /**
+ * Indicates if the job has been released.
+ *
+ * @var bool
+ */
+ protected $released = false;
+
+ /**
+ * Indicates if the job has failed.
+ *
+ * @var bool
+ */
+ protected $failed = false;
+
+ /**
+ * The name of the connection the job belongs to.
+ *
+ * @var string
+ */
+ protected $connectionName;
+
+ /**
+ * The name of the queue the job belongs to.
+ *
+ * @var string
+ */
+ protected $queue;
+
+ /**
+ * Fire the job.
+ *
+ * @return void
+ */
+ public function fire()
+ {
+ $handler = $this->resolve($this->getName());
+
+ $this->instance = $handler->fire($this, $this->getData());
+ }
+
+ /**
+ * Delete the job from the queue.
+ *
+ * @return void
+ */
+ public function delete()
+ {
+ $this->deleted = true;
+ }
+
+ /**
+ * Determine if the job has been deleted.
+ *
+ * @return bool
+ */
+ public function isDeleted(): bool
+ {
+ return $this->deleted;
+ }
+
+ /**
+ * Release the job back into the queue.
+ *
+ * @param int $delay
+ *
+ * @return void
+ */
+ public function release(int $delay = 0)
+ {
+ $this->released = true;
+ }
+
+ /**
+ * Determine if the job was released back into the queue.
+ *
+ * @return bool
+ */
+ public function isReleased(): bool
+ {
+ return $this->released;
+ }
+
+ /**
+ * Determine if the job has been deleted or released.
+ *
+ * @return bool
+ */
+ public function isDeletedOrReleased(): bool
+ {
+ return $this->isDeleted() || $this->isReleased();
+ }
+
+ /**
+ * Determine if the job has been marked as a failure.
+ *
+ * @return bool
+ */
+ public function hasFailed(): bool
+ {
+ return $this->failed;
+ }
+
+ /**
+ * Mark the job as "failed".
+ *
+ * @return void
+ */
+ public function markAsFailed()
+ {
+ $this->failed = true;
+ }
+
+ /**
+ * Process an exception that caused the job to fail.
+ *
+ * @param Exception $e
+ *
+ * @return void
+ */
+ public function failed($e)
+ {
+ $this->markAsFailed();
+
+ if (method_exists($this->instance = $this->resolve($this->getName()), 'failed')) {
+ $this->instance->failed($this->getData(), $e);
+ }
+ }
+
+ /**
+ * Get the decoded body of the job.
+ *
+ * @return array
+ */
+ public function payload(): array
+ {
+ return json_decode($this->getRawBody(), true);
+ }
+
+ /**
+ * Get the number of times to attempt a job.
+ *
+ * @return int|null
+ */
+ public function maxTries(): ?int
+ {
+ return $this->payload()['maxTries'] ?? null;
+ }
+
+ /**
+ * Get the number of seconds the job can run.
+ *
+ * @return int|null
+ */
+ public function timeout(): ?int
+ {
+ return $this->payload()['timeout'] ?? null;
+ }
+
+ /**
+ * Get the timestamp indicating when the job should timeout.
+ *
+ * @return int|null
+ */
+ public function timeoutAt(): ?int
+ {
+ return $this->payload()['timeoutAt'] ?? null;
+ }
+
+ /**
+ * Get the name of the queued job class.
+ *
+ * @return string
+ */
+ public function getName(): string
+ {
+ return $this->payload()['job'];
+ }
+
+ /**
+ * Get data of queued job.
+ *
+ * @return array
+ */
+ public function getData(): array
+ {
+ return $this->payload()['data'];
+ }
+
+ /**
+ * Get the name of the connection the job belongs to.
+ *
+ * @return string
+ */
+ public function getConnectionName(): ?string
+ {
+ return $this->connectionName;
+ }
+
+ /**
+ * Get the name of the queue the job belongs to.
+ *
+ * @return string
+ */
+ public function getQueue(): ?string
+ {
+ return $this->queue;
+ }
+
+ /**
+ * Get the job identifier.
+ *
+ * @return string
+ */
+ abstract public function getJobId(): string;
+
+ /**
+ * Get the raw body of the job.
+ *
+ * @return string
+ */
+ abstract public function getRawBody(): string;
+
+ /**
+ * Resolve the given class
+ *
+ * @param string $class
+ *
+ * @return JobInterface
+ */
+ abstract protected function resolve(string $class): JobInterface;
+}
diff --git a/Job/JobContractInterface.php b/Job/JobContractInterface.php
new file mode 100644
index 0000000..ca66901
--- /dev/null
+++ b/Job/JobContractInterface.php
@@ -0,0 +1,134 @@
+
* @author Virchenko Maksim
*/
-class MongoJob extends Job implements JobContract
+class MongoJobContract extends JobContract implements JobContractInterface
{
+ /**
+ * Job resolver
+ *
+ * @var JobResolverInterface
+ */
+ protected $resolver;
+
/**
* The database queue instance.
*
- * @var MongoQueue
+ * @var QueueInterface
*/
protected $database;
@@ -33,17 +39,17 @@ class MongoJob extends Job implements JobContract
/**
* Create a new job instance.
*
- * @param ContainerInterface $container
- * @param MongoQueue $database
- * @param StdClass $job
+ * @param JobResolverInterface $resolver
+ * @param QueueInterface $database
+ * @param StdClass|MongoDB\Model\BSONDocument $job
* @param string $queue
*/
- public function __construct(ContainerInterface $container, MongoQueue $database, $job, $queue)
+ public function __construct(JobResolverInterface $resolver, QueueInterface $database, $job, string $queue)
{
+ $this->resolver = $resolver;
+ $this->database = $database;
$this->job = $job;
$this->queue = $queue;
- $this->database = $database;
- $this->container = $container;
}
/**
@@ -103,8 +109,20 @@ public function getJobId(): string
*
* @return string
*/
- public function getRawBody()
+ public function getRawBody(): string
{
return $this->job->payload;
}
+
+ /**
+ * Resolve job
+ *
+ * @param string $class
+ *
+ * @return JobInterface
+ */
+ protected function resolve(string $class): JobInterface
+ {
+ return $this->resolver->resolve($class);
+ }
}
diff --git a/Queue/MongoQueue.php b/Queue/MongoQueue.php
index d136c13..716c0bb 100644
--- a/Queue/MongoQueue.php
+++ b/Queue/MongoQueue.php
@@ -2,16 +2,13 @@
namespace SfCod\QueueBundle\Queue;
-use Carbon\Carbon;
+use DateInterval;
use DateTime;
-use Exception;
-use Illuminate\Container\Container;
-use Illuminate\Queue\Queue;
use MongoDB\Collection;
-use SfCod\QueueBundle\Base\Job;
-use SfCod\QueueBundle\Job\MongoJob;
-use SfCod\QueueBundle\Service\MongoDriverInterface;
-use Symfony\Component\DependencyInjection\ContainerInterface;
+use SfCod\QueueBundle\Base\JobResolverInterface;
+use SfCod\QueueBundle\Base\MongoDriverInterface;
+use SfCod\QueueBundle\Job\JobContractInterface;
+use SfCod\QueueBundle\Job\MongoJobContract;
/**
* Class MongoQueue
@@ -22,10 +19,17 @@
*/
class MongoQueue extends Queue
{
+ /**
+ * Job resolver
+ *
+ * @var JobResolverInterface
+ */
+ protected $resolver;
+
/**
* The mongo connection instance.
*
- * @var \Illuminate\Database\Connection
+ * @var MongoDriverInterface
*/
protected $mongo;
@@ -41,7 +45,7 @@ class MongoQueue extends Queue
*
* @var string
*/
- protected $queue;
+ protected $queue = 'default';
/**
* The expiration time of a job.
@@ -64,16 +68,19 @@ class MongoQueue extends Queue
* @param int $expire
* @param int $limit
*/
- public function __construct(MongoDriverInterface $mongo,
- string $collection,
- string $queue = 'default',
- int $expire = 60,
- int $limit = 15
+ public function __construct(
+ JobResolverInterface $resolver,
+ MongoDriverInterface $mongo,
+ string $collection,
+ string $queue = 'default',
+ int $expire = 60,
+ int $limit = 15
) {
+ $this->resolver = $resolver;
+ $this->mongo = $mongo;
$this->collection = $collection;
$this->expire = $expire;
$this->queue = $queue;
- $this->mongo = $mongo;
$this->limit = $limit;
}
@@ -86,7 +93,7 @@ public function __construct(MongoDriverInterface $mongo,
*
* @return mixed
*/
- public function push($job, $data = '', $queue = null)
+ public function push(string $job, array $data = [], ?string $queue = null)
{
return $this->pushToDatabase(0, $queue, $this->createPayload($job, $data));
}
@@ -96,9 +103,9 @@ public function push($job, $data = '', $queue = null)
*
* @param string $queue
*
- * @return null|Job
+ * @return null|JobContractInterface
*/
- public function pop($queue = null)
+ public function pop(?string $queue = null): ?JobContractInterface
{
$queue = $this->getQueue($queue);
@@ -113,12 +120,12 @@ public function pop($queue = null)
* Push a new job onto the queue.
*
* @param string $job
- * @param mixed $data
- * @param string $queue
+ * @param array $data
+ * @param string|null $queue
*
- * @return mixed
+ * @return bool
*/
- public function exists($job, $data = '', $queue = null)
+ public function exists(string $job, array $data = [], ?string $queue = null): bool
{
return null !== $this->getCollection()->findOne([
'queue' => $queue,
@@ -130,12 +137,12 @@ public function exists($job, $data = '', $queue = null)
* Push a raw payload onto the queue.
*
* @param string $payload
- * @param string $queue
+ * @param string|null $queue
* @param array $options
*
* @return mixed
*/
- public function pushRaw($payload, $queue = null, array $options = [])
+ public function pushRaw(string $payload, ?string $queue = null, array $options = [])
{
return $this->pushToDatabase(0, $queue, $payload);
}
@@ -143,14 +150,14 @@ public function pushRaw($payload, $queue = null, array $options = [])
/**
* Push a new job onto the queue after a delay.
*
- * @param DateTime|int $delay
+ * @param DateInterval|int $delay
* @param string $job
- * @param mixed $data
+ * @param array $data
* @param string $queue
*
* @return mixed
*/
- public function later($delay, $job, $data = '', $queue = null)
+ public function later($delay, string $job, array $data = [], ?string $queue = null)
{
return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data));
}
@@ -164,7 +171,7 @@ public function later($delay, $job, $data = '', $queue = null)
*
* @return mixed
*/
- public function bulk($jobs, $data = '', $queue = null)
+ public function bulk(array $jobs, array $data = [], ?string $queue = null)
{
$queue = $this->getQueue($queue);
@@ -182,11 +189,11 @@ public function bulk($jobs, $data = '', $queue = null)
*
* @param string $queue
* @param \StdClass $job
- * @param int $delay
+ * @param DateInterval|int $delay
*
* @return mixed
*/
- public function release($queue, $job, $delay)
+ public function release(string $queue, StdClass $job, $delay)
{
return $this->pushToDatabase($delay, $queue, $job->payload, $job->attempts);
}
@@ -196,9 +203,9 @@ public function release($queue, $job, $delay)
*
* @param $id
*
- * @return null|Job
+ * @return null|JobContractInterface
*/
- public function getJobById($id)
+ public function getJobById($id): ?JobContractInterface
{
$job = $this->getCollection()->findOne(['_id' => new \MongoDB\BSON\ObjectID($id)]);
@@ -207,7 +214,7 @@ public function getJobById($id)
} else {
$job = (object)$job;
- return new MongoJob($this->container, $this, $job, $job->queue);
+ return new MongoJobContract($this->resolver, $this, $job, $job->queue);
}
}
@@ -219,7 +226,7 @@ public function getJobById($id)
*
* @return int
*/
- public function deleteReserved($queue, $id): int
+ public function deleteReserved(string $queue, $id): int
{
$query = [
'_id' => new \MongoDB\BSON\ObjectID($id),
@@ -242,9 +249,9 @@ public function getExpire()
/**
* Set the expiration time in seconds.
*
- * @param int|null $seconds
+ * @param int $seconds
*/
- public function setExpire($seconds)
+ public function setExpire(int $seconds)
{
$this->expire = $seconds;
}
@@ -256,7 +263,7 @@ public function setExpire($seconds)
*
* @return int
*/
- public function size($queue = null)
+ public function size(?string $queue = null): int
{
if ($queue) {
return $this->getCollection()->count(['queue' => $queue]);
@@ -268,11 +275,11 @@ public function size($queue = null)
/**
* Check if can run process depend on limits
*
- * @param Job $job
+ * @param JobContractInterface $job
*
* @return bool
*/
- public function canRunJob(Job $job)
+ public function canRunJob(JobContractInterface $job): bool
{
return $this->getCollection()->count([
'reserved' => 1,
@@ -283,9 +290,9 @@ public function canRunJob(Job $job)
/**
* Mark the given job ID as reserved.
*
- * @param Job $job
+ * @param JobContractInterface $job
*/
- public function markJobAsReserved($job)
+ public function markJobAsReserved(JobContractInterface $job)
{
$attempts = $job->attempts() + 1;
$reserved_at = $this->currentTime();
@@ -302,7 +309,7 @@ public function markJobAsReserved($job)
/**
* Push a raw payload to the mongo with a given delay.
*
- * @param DateTime|int $delay
+ * @param DateInterval|int $delay
* @param string|null $queue
* @param string $payload
* @param int $attempts
@@ -319,15 +326,15 @@ protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
/**
* Get the "available at" UNIX timestamp.
*
- * @param DateTime|int $delay
+ * @param DateInterval|int $delay
*
* @return int
*/
- protected function getAvailableAt($delay)
+ protected function getAvailableAt($delay = 0)
{
- $availableAt = $delay instanceof DateTime ? $delay : Carbon::now()->addSeconds($delay);
-
- return $availableAt->getTimestamp();
+ return $delay instanceof DateInterval
+ ? (new DateTime())->add($delay)->getTimestamp()
+ : time() + $delay;
}
/**
@@ -347,7 +354,7 @@ protected function getQueue($queue)
*
* @param string|null $queue
*
- * @return null|Job
+ * @return null|JobContractInterface
*/
protected function getNextAvailableJob($queue)
{
@@ -362,7 +369,7 @@ protected function getNextAvailableJob($queue)
'sort' => ['_id' => 1],
]);
- return $job ? new MongoJob($this->container, $this, (object)$job, ((object)$job)->queue) : null;
+ return $job ? new MongoJobContract($this->resolver, $this, (object)$job, ((object)$job)->queue) : null;
}
/**
@@ -409,7 +416,7 @@ protected function isAvailable()
protected function isReservedButExpired()
{
return [
- 'reserved_at' => ['$lte' => Carbon::now()->subSeconds($this->expire)->getTimestamp()],
+ 'reserved_at' => ['$lte' => time() - $this->expire],
];
}
@@ -422,22 +429,4 @@ protected function getCollection(): Collection
{
return $this->mongo->getDatabase()->selectCollection($this->collection);
}
-
- /**
- * @param ContainerInterface $container
- */
- public function putContainer(ContainerInterface $container)
- {
- $this->container = $container;
- }
-
- /**
- * @param Container $container
- *
- * @throws Exception
- */
- public function setContainer(Container $container)
- {
- // Nothing
- }
}
diff --git a/Queue/Queue.php b/Queue/Queue.php
new file mode 100644
index 0000000..e1f7ba2
--- /dev/null
+++ b/Queue/Queue.php
@@ -0,0 +1,136 @@
+
+ *
+ * @package SfCod\QueueBundle\Queue
+ */
+abstract class Queue implements QueueInterface
+{
+ use InteractWithTimeTrait;
+
+ /**
+ * The connection name for the queue.
+ *
+ * @var string
+ */
+ protected $connectionName;
+
+ /**
+ * Push a new job onto the queue.
+ *
+ * @param string $queue
+ * @param string $job
+ * @param array $data
+ *
+ * @return mixed
+ */
+ public function pushOn(string $queue, string $job, array $data = [])
+ {
+ return $this->push($job, $data, $queue);
+ }
+
+ /**
+ * Push a new job onto the queue after a delay.
+ *
+ * @param string $queue
+ * @param \DateTimeInterface|\DateInterval|int $delay
+ * @param string $job
+ * @param array $data
+ *
+ * @return mixed
+ */
+ public function laterOn(string $queue, $delay, string $job, array $data = [])
+ {
+ return $this->later($delay, $job, $data, $queue);
+ }
+
+ /**
+ * Push an array of jobs onto the queue.
+ *
+ * @param array $jobs
+ * @param array $data
+ * @param string $queue
+ *
+ * @return mixed
+ */
+ public function bulk(array $jobs, array $data = [], ?string $queue = null)
+ {
+ foreach ((array)$jobs as $job) {
+ $this->push($job, $data, $queue);
+ }
+ }
+
+ /**
+ * Create a payload string from the given job and data.
+ *
+ * @param string $job
+ * @param mixed $data
+ *
+ * @return string
+ *
+ * @throws InvalidPayloadException
+ */
+ protected function createPayload(string $job, array $data = [])
+ {
+ $payload = json_encode($this->createPayloadArray($job, $data));
+
+ if (JSON_ERROR_NONE !== json_last_error()) {
+ throw new InvalidPayloadException(
+ 'Unable to JSON encode payload. Error code: ' . json_last_error()
+ );
+ }
+
+ return $payload;
+ }
+
+ /**
+ * Create a payload array from the given job and data.
+ *
+ * @param string $job
+ * @param array $data
+ *
+ * @return array
+ */
+ protected function createPayloadArray(string $job, array $data = []): array
+ {
+ return [
+ 'displayName' => explode('@', $job)[0],
+ 'job' => $job,
+ 'maxTries' => null,
+ 'timeout' => null,
+ 'data' => $data,
+ ];
+ }
+
+ /**
+ * Get the connection name for the queue.
+ *
+ * @return string
+ */
+ public function getConnectionName(): string
+ {
+ return $this->connectionName;
+ }
+
+ /**
+ * Set the connection name for the queue.
+ *
+ * @param string $name
+ *
+ * @return $this
+ */
+ public function setConnectionName(string $name)
+ {
+ $this->connectionName = $name;
+
+ return $this;
+ }
+}
diff --git a/Queue/QueueInterface.php b/Queue/QueueInterface.php
new file mode 100644
index 0000000..c1d10c6
--- /dev/null
+++ b/Queue/QueueInterface.php
@@ -0,0 +1,163 @@
+push(<--YOUR JOB SERVICE NAME->>, $data);
+$jobQueue->push(your_job_handler_service, $data);
```
$data - additional data for your job
@@ -74,4 +47,113 @@ Where:
'job_queue_worker.raise_exception_occurred_job': SfCod\QueueBundle\Event\JobExceptionOccurredEvent;
'job_queue_worker.raise_failed_job': SfCod\QueueBundle\Event\JobFailedEvent;
'job_queue_worker.stop': SfCod\QueueBundle\Event\WorkerStoppingEvent;
+```
+
+#### Configurable services list (with default parameters):
+
+##### Main services:
+```yaml
+SfCod\QueueBundle\Service\JobQueue:
+ public: true
+ arguments:
+ - '@SfCod\QueueBundle\Service\QueueManager'
+```
+SfCod\QueueBundle\Service\JobQueue: main job queue service.
+
+```yaml
+SfCod\QueueBundle\Service\QueueManager:
+ calls:
+ - [addConnector, ['mongo-thread', '@SfCod\QueueBundle\Connector\ConnectorInterface']]
+```
+SfCod\QueueBundle\Service\QueueManager: queue manager which holds connectors and connections.
+
+```yaml
+SfCod\QueueBundle\Worker\Worker:
+ arguments:
+ - '@SfCod\QueueBundle\Service\QueueManager'
+ - '@SfCod\QueueBundle\Service\JobProcess'
+ - '@SfCod\QueueBundle\Failer\FailedJobProviderInterface'
+ - '@SfCod\QueueBundle\Handler\ExceptionHandlerInterface'
+ - '@Symfony\Component\EventDispatcher\EventDispatcherInterface'
+```
+SfCod\QueueBundle\Worker\Worker: main worker service.
+
+```yaml
+SfCod\QueueBundle\Service\JobProcess:
+ arguments:
+ - 'console'
+ - '%kernel.project_dir%/bin'
+ - 'php'
+ - ''
+```
+SfCod\QueueBundle\Service\JobProcess: default config for jobs command processor in async queues, where:
+- 'console' - name of console command
+- '%kernel.project_dir%/bin' - path for console command
+- 'php' - binary script
+- '' - binary script arguments
+
+##### Connector
+```yaml
+SfCod\QueueBundle\Connector\ConnectorInterface:
+ class: SfCod\QueueBundle\Connector\MongoConnector
+ arguments:
+ - '@SfCod\QueueBundle\Base\JobResolverInterface'
+ - '@SfCod\QueueBundle\Base\MongoDriverInterface'
+```
+SfCod\QueueBundle\Connector\ConnectorInterface: connector for queues' database.
+
+##### Job resolver
+```yaml
+SfCod\QueueBundle\Base\JobResolverInterface:
+ class: SfCod\QueueBundle\Service\JobResolver
+ arguments:
+ - '@Symfony\Component\DependencyInjection\ContainerInterface'
+```
+SfCod\QueueBundle\Base\JobResolverInterface: resolver for jobs, it builds job using job's display name, for default jobs fetches from container as a public services.
+
+##### Failed jobs provider
+```yaml
+SfCod\QueueBundle\Failer\FailedJobProviderInterface:
+ class: SfCod\QueueBundle\Failer\MongoFailedJobProvider
+ arguments:
+ - '@SfCod\QueueBundle\Base\MongoDriverInterface'
+ - 'queue_jobs_failed'
+```
+SfCod\QueueBundle\Failer\FailedJobProviderInterface: failer service for failed jobs processing, where:
+- SfCod\QueueBundle\Base\MongoDriverInterface - mongo driver
+- 'queue_jobs_failed' - name of mongo collection
+
+##### Exception handler
+```yaml
+SfCod\QueueBundle\Handler\ExceptionHandlerInterface:
+ class: SfCod\QueueBundle\Handler\ExceptionHandler
+ arguments:
+ - '@Psr\Log\LoggerInterface'
+```
+SfCod\QueueBundle\Handler\ExceptionHandlerInterface: main exception handler, used for logging issues
+
+##### Mongo driver config:
+
+```yaml
+SfCod\QueueBundle\Base\MongoDriverInterface:
+ class: SfCod\QueueBundle\Service\MongoDriver
+ calls:
+ - [setCredentials, ['%env(MONGODB_URL)%']]
+ - [setDbname, ['%env(MONGODB_NAME)%']]
+```
+SfCod\QueueBundle\Base\MongoDriverInterface: default config for mongo driver connection
+
+##### New connector:
+
+If you want to change default connector, you can override SfCod\QueueBundle\Connector\ConnectorInterface or add method call:
+```yaml
+SfCod\QueueBundle\Service\QueueManager:
+ calls:
+ - [addConnector, ['your-connector', '@your.service']]
+```
+where 'your.service' must implement SfCod\QueueBundle\Connector\ConnectorInterface and then all your connections with driver 'your-connector' will be processed using new connector, for example:
+```yaml
+sfcod_queue:
+ connections:
+ default: { driver: 'your-connector', collection: 'queue_jobs' queue: 'default', expire: 60, limit: 2 }
```
\ No newline at end of file
diff --git a/JobProcess.php b/Service/JobProcess.php
similarity index 83%
rename from JobProcess.php
rename to Service/JobProcess.php
index 538bb28..06ea361 100644
--- a/JobProcess.php
+++ b/Service/JobProcess.php
@@ -1,9 +1,8 @@
getBackgroundCommand($cmd);
diff --git a/Service/JobQueue.php b/Service/JobQueue.php
index f568ecf..b9af986 100644
--- a/Service/JobQueue.php
+++ b/Service/JobQueue.php
@@ -2,44 +2,21 @@
namespace SfCod\QueueBundle\Service;
-use Illuminate\Queue\Capsule\Manager;
-use Illuminate\Queue\Connectors\ConnectorInterface;
-use Illuminate\Queue\QueueManager;
use SfCod\QueueBundle\Base\JobQueueInterface;
-use SfCod\QueueBundle\Connector\MongoConnector;
-use Symfony\Component\DependencyInjection\ContainerAwareTrait;
use Symfony\Component\DependencyInjection\ContainerInterface;
/**
- * Service for illuminate queues to work with mongodb
+ * JobQueue service
*
* @author Virchenko Maksim
* @author Orlov Alexey
*/
class JobQueue implements JobQueueInterface
{
- use ContainerAwareTrait;
-
- /**
- * Available connections
- *
- * @var array
- */
- protected $connections = [
- 'default' => [
- 'driver' => 'mongo-thread',
- 'collection' => 'queue_jobs',
- 'queue' => 'default',
- 'expire' => 60,
- 'limit' => 2,
- 'connection' => MongoDriverInterface::class, // Default mongo connection
- ],
- ];
-
/**
- * Manager instance
+ * QueueManager instance
*
- * @var Manager
+ * @var QueueManager
*/
protected $manager;
@@ -51,22 +28,9 @@ class JobQueue implements JobQueueInterface
*
* @internal param array $config
*/
- public function __construct(ContainerInterface $container, array $connections = [])
- {
- $this->connections = array_merge($this->connections, $connections);
- $this->container = $container;
-
- $this->connect();
- }
-
- /**
- * Get queue manager instance
- *
- * @return QueueManager
- */
- public function getQueueManager(): QueueManager
+ public function __construct(QueueManager $manager)
{
- return $this->manager->getQueueManager();
+ $this->manager = $manager;
}
/**
@@ -83,7 +47,7 @@ public function getQueueManager(): QueueManager
*/
public function push(string $job, array $data = [], string $queue = 'default', string $connection = 'default')
{
- return Manager::push($job, $data, $queue, $connection);
+ return $this->manager->push($job, $data, $queue, $connection);
}
/**
@@ -100,8 +64,8 @@ public function push(string $job, array $data = [], string $queue = 'default', s
*/
public function pushUnique(string $job, array $data = [], string $queue = 'default', string $connection = 'default')
{
- if (false === Manager::connection($connection)->exists($job, $data, $queue)) {
- return Manager::push($job, $data, $queue, $connection);
+ if (false === $this->manager->connection($connection)->exists($job, $data, $queue)) {
+ return $this->manager->push($job, $data, $queue, $connection);
}
return null;
@@ -119,7 +83,7 @@ public function pushUnique(string $job, array $data = [], string $queue = 'defau
*/
public function bulk(array $jobs, array $data = [], string $queue = 'default', string $connection = 'default')
{
- return Manager::bulk($jobs, $data, $queue, $connection);
+ return $this->manager->bulk($jobs, $data, $queue, $connection);
}
/**
@@ -135,7 +99,7 @@ public function bulk(array $jobs, array $data = [], string $queue = 'default', s
*/
public function later(int $delay, string $job, array $data = [], string $queue = 'default', string $connection = 'default')
{
- return Manager::later($delay, $job, $data, $queue, $connection);
+ return $this->manager->later($delay, $job, $data, $queue, $connection);
}
/**
@@ -151,47 +115,10 @@ public function later(int $delay, string $job, array $data = [], string $queue =
*/
public function laterUnique(int $delay, string $job, array $data = [], string $queue = 'default', string $connection = 'default')
{
- if (false === Manager::connection($connection)->exists($job, $data, $queue)) {
- return Manager::later($delay, $job, $data, $queue, $connection);
+ if (false === $this->manager->connection($connection)->exists($job, $data, $queue)) {
+ return $this->manager->later($delay, $job, $data, $queue, $connection);
}
return null;
}
-
- /**
- * Add connector
- *
- * @param string $name
- * @param Closure $resolver
- */
- public function addConnector(string $name, ConnectorInterface $connector)
- {
- $this->manager->addConnector($name, function () use ($connector) {
- return $connector;
- });
- }
-
- /**
- * Connect queue manager for mongo database
- *
- * @author Virchenko Maksim
- *
- * @return Manager
- */
- protected function connect()
- {
- if (is_null($this->manager)) {
- $this->manager = new Manager();
-
- $this->addConnector('mongo-thread', new MongoConnector($this->container));
-
- foreach ($this->connections as $name => $params) {
- $this->manager->addConnection($params, $name);
- }
-
- $this->manager->setAsGlobal();
- }
-
- return $this->manager;
- }
}
diff --git a/Service/JobResolver.php b/Service/JobResolver.php
new file mode 100644
index 0000000..97aab87
--- /dev/null
+++ b/Service/JobResolver.php
@@ -0,0 +1,44 @@
+
+ *
+ * @package SfCod\QueueBundle\Service
+ */
+class JobResolver implements JobResolverInterface
+{
+ /**
+ * @var ContainerInterface
+ */
+ protected $container;
+
+ /**
+ * JobResolver constructor.
+ *
+ * @param ContainerInterface $container
+ */
+ public function __construct(ContainerInterface $container)
+ {
+ $this->container = $container;
+ }
+
+ /**
+ * Resolve the given class.
+ *
+ * @param string $class
+ *
+ * @return JobInterface
+ */
+ public function resolve(string $class): JobInterface
+ {
+ return $this->container->get($class);
+ }
+}
diff --git a/Service/MongoDriver.php b/Service/MongoDriver.php
index 1b5ccd4..195b13f 100644
--- a/Service/MongoDriver.php
+++ b/Service/MongoDriver.php
@@ -4,6 +4,7 @@
use MongoDB\Client;
use MongoDB\Database;
+use SfCod\QueueBundle\Base\MongoDriverInterface;
/**
* Class MongoDriver
diff --git a/Service/QueueManager.php b/Service/QueueManager.php
new file mode 100644
index 0000000..029c4e1
--- /dev/null
+++ b/Service/QueueManager.php
@@ -0,0 +1,198 @@
+
+ *
+ * @package SfCod\QueueBundle\Service
+ */
+class QueueManager
+{
+ /**
+ * Queue config
+ *
+ * @var array
+ */
+ protected $config = [
+ 'queue.default' => 'default',
+ ];
+
+ /**
+ * The array of resolved queue connections.
+ *
+ * @var array
+ */
+ protected $connections = [];
+
+ /**
+ * The array of resolved queue connectors.
+ *
+ * @var array
+ */
+ protected $connectors = [];
+
+ /**
+ * Determine if the driver is connected.
+ *
+ * @param string $name
+ *
+ * @return bool
+ */
+ public function connected(?string $name = null): bool
+ {
+ return isset($this->connections[$name ?: $this->getDefaultDriver()]);
+ }
+
+ /**
+ * Resolve a queue connection instance.
+ *
+ * @param string $name
+ *
+ * @return QueueInterface
+ */
+ public function connection(?string $name = null): QueueInterface
+ {
+ $name = $name ?? $this->getDefaultDriver();
+
+ // If the connection has not been resolved yet we will resolve it now as all
+ // of the connections are resolved when they are actually needed so we do
+ // not make any unnecessary connection to the various queue end-points.
+ if (!isset($this->connections[$name])) {
+ $this->connections[$name] = $this->resolve($name);
+ }
+
+ return $this->connections[$name];
+ }
+
+ /**
+ * Add a queue connection resolver.
+ *
+ * @param string $driver
+ * @param ConnectorInterface $resolver
+ *
+ * @return void
+ */
+ public function addConnector(string $driver, $resolver)
+ {
+ $this->connectors[$driver] = $resolver;
+ }
+
+ /**
+ * Register a connection with the manager.
+ *
+ * @param array $config
+ * @param string $name
+ *
+ * @return void
+ */
+ public function addConnection(array $config, ?string $name = null)
+ {
+ $name = $name ?? $this->getDefaultDriver();
+
+ $this->config["queue.connections.{$name}"] = $config;
+ }
+
+ /**
+ * Get the name of the default queue connection.
+ *
+ * @return string
+ */
+ public function getDefaultDriver(): string
+ {
+ return $this->config['queue.default'];
+ }
+
+ /**
+ * Set the name of the default queue connection.
+ *
+ * @param string $name
+ *
+ * @return void
+ */
+ public function setDefaultDriver(string $name)
+ {
+ $this->config['queue.default'] = $name;
+ }
+
+ /**
+ * Get the full name for the given connection.
+ *
+ * @param string $connection
+ *
+ * @return string
+ */
+ public function getName(?string $connection = null): string
+ {
+ return $connection ?? $this->getDefaultDriver();
+ }
+
+ /**
+ * Dynamically pass calls to the default connection.
+ *
+ * @param string $method
+ * @param array $parameters
+ *
+ * @return mixed
+ */
+ public function __call($method, $parameters)
+ {
+ return $this->connection()->$method(...$parameters);
+ }
+
+ /**
+ * Get the queue connection configuration.
+ *
+ * @param string $name
+ *
+ * @return array
+ */
+ protected function getConfig(string $name): array
+ {
+ if (!is_null($name) && 'null' !== $name) {
+ return $this->config["queue.connections.{$name}"];
+ }
+
+ return ['driver' => 'null'];
+ }
+
+ /**
+ * Resolve a queue connection.
+ *
+ * @param string $name
+ *
+ * @return QueueInterface
+ */
+ protected function resolve(string $name): QueueInterface
+ {
+ $config = $this->getConfig($name);
+
+ return $this->getConnector($config['driver'])
+ ->connect($config)
+ ->setConnectionName($name);
+ }
+
+ /**
+ * Get the connector for a given driver.
+ *
+ * @param string $driver
+ *
+ * @return ConnectorInterface
+ *
+ * @throws InvalidArgumentException
+ */
+ protected function getConnector(string $driver): ConnectorInterface
+ {
+ if (!isset($this->connectors[$driver])) {
+ throw new InvalidArgumentException("No connector for [$driver]");
+ }
+
+ return $this->connectors[$driver];
+ }
+}
diff --git a/Tests/Data/LoadTrait.php b/Tests/Data/LoadTrait.php
index ac37c5d..9bf2111 100644
--- a/Tests/Data/LoadTrait.php
+++ b/Tests/Data/LoadTrait.php
@@ -4,8 +4,8 @@
use Monolog\Logger;
use Psr\Log\LoggerInterface;
+use SfCod\QueueBundle\Base\MongoDriverInterface;
use SfCod\QueueBundle\DependencyInjection\QueueExtension;
-use SfCod\QueueBundle\Service\MongoDriverInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\Dotenv\Dotenv;
use Symfony\Component\Dotenv\Exception\PathException;
diff --git a/Tests/Data/TestJob.php b/Tests/Data/TestJobContract.php
similarity index 90%
rename from Tests/Data/TestJob.php
rename to Tests/Data/TestJobContract.php
index 782fc99..e98d8c4 100644
--- a/Tests/Data/TestJob.php
+++ b/Tests/Data/TestJobContract.php
@@ -2,7 +2,7 @@
namespace SfCod\QueueBundle\Tests\Data;
-use Illuminate\Queue\Jobs\Job;
+use SfCod\QueueBundle\Job\JobContract;
use Symfony\Component\DependencyInjection\ContainerInterface;
/**
@@ -14,7 +14,7 @@
*
* @package SfCod\QueueBundle\Tests
*/
-class TestJob extends Job
+class TestJobContract extends JobContract
{
/**
* @var ContainerInterface
@@ -44,7 +44,7 @@ public function __construct(ContainerInterface $container)
/**
* Get container instance
*
- * @return \Illuminate\Container\Container|ContainerInterface
+ * @return ContainerInterface
*/
public function getContainer()
{
diff --git a/Options.php b/Worker/Options.php
similarity index 98%
rename from Options.php
rename to Worker/Options.php
index 062e577..9ce5837 100644
--- a/Options.php
+++ b/Worker/Options.php
@@ -1,6 +1,6 @@
manager = $queue->getQueueManager();
+ $this->queueManager = $queueManager;
$this->process = $process;
$this->failer = $failer;
$this->exceptions = $exceptions;
@@ -94,7 +96,7 @@ public function __construct(JobQueue $queue,
* @param string $queue
* @param Options $options
*/
- public function daemon($connectionName, $queue, Options $options)
+ public function daemon(string $connectionName, string $queue, Options $options)
{
while (true) {
if (false === $this->runNextJob($connectionName, $queue, $options)) {
@@ -116,17 +118,15 @@ public function daemon($connectionName, $queue, Options $options)
*
* @return bool
*/
- public function runNextJob($connectionName, $queue, Options $options)
+ public function runNextJob(string $connectionName, string $queue, Options $options)
{
- /** @var MongoQueue|Queue $connection */
- $connection = $this->manager->connection($connectionName);
-
+ $connection = $this->queueManager->connection($connectionName);
$job = $this->getNextJob($connection, $queue);
// If we're able to pull a job off of the stack, we will process it and then return
// from this method. If there is no job on the queue, we will "sleep" the worker
// for the specified number of seconds, then keep processing jobs after sleep.
- if ($job instanceof Job && $connection->canRunJob($job)) {
+ if ($job instanceof JobContractInterface && $connection->canRunJob($job)) {
$connection->markJobAsReserved($job);
$this->runInBackground($job, $connectionName);
@@ -143,18 +143,16 @@ public function runNextJob($connectionName, $queue, Options $options)
* @param $id
* @param Options $options
*/
- public function runJobById($connectionName, $id, Options $options)
+ public function runJobById(string $connectionName, $id, Options $options)
{
- /** @var MongoQueue|Queue $connection */
- $connection = $this->manager->connection($connectionName);
-
try {
+ $connection = $this->queueManager->connection($connectionName);
$job = $connection->getJobById($id);
// If we're able to pull a job off of the stack, we will process it and then return
// from this method. If there is no job on the queue, we will "sleep" the worker
// for the specified number of seconds, then keep processing jobs after sleep.
- if ($job instanceof Job) {
+ if ($job instanceof JobContractInterface) {
if (false === $job->reserved()) {
$connection->markJobAsReserved($job);
}
@@ -166,7 +164,7 @@ public function runJobById($connectionName, $id, Options $options)
} catch (Exception $e) {
$this->exceptions->report($e);
} catch (Throwable $e) {
- $this->exceptions->report(new FatalThrowableError($e));
+ $this->exceptions->report(new FatalThrowableException($e));
}
$this->sleep($options->sleep);
@@ -175,10 +173,10 @@ public function runJobById($connectionName, $id, Options $options)
/**
* Make a Process for the Artisan command for the job id.
*
- * @param Job $job
+ * @param JobContractInterface $job
* @param string $connectionName
*/
- public function runInBackground(Job $job, string $connectionName)
+ public function runInBackground(JobContractInterface $job, string $connectionName)
{
$process = $this->process->getProcess($job, $connectionName);
@@ -188,14 +186,14 @@ public function runInBackground(Job $job, string $connectionName)
/** Process the given job from the queue.
*
* @param string $connectionName
- * @param \Illuminate\Contracts\Queue\Job $job
+ * @param JobContractInterface $job
* @param Options $options
*
* @return void
*
* @throws \Throwable
*/
- public function process($connectionName, $job, Options $options)
+ public function process(string $connectionName, JobContractInterface $job, Options $options)
{
try {
// First we will raise the before job event and determine if the job has already ran
@@ -217,7 +215,7 @@ public function process($connectionName, $job, Options $options)
$this->handleJobException($connectionName, $job, $options, $e);
} catch (Throwable $e) {
$this->handleJobException(
- $connectionName, $job, $options, new FatalThrowableError($e)
+ $connectionName, $job, $options, new FatalThrowableException($e)
);
}
}
@@ -229,7 +227,7 @@ public function process($connectionName, $job, Options $options)
*
* @return void
*/
- public function sleep($seconds)
+ public function sleep(int $seconds)
{
sleep($seconds);
}
@@ -241,7 +239,7 @@ public function sleep($seconds)
*
* @return bool
*/
- public function memoryExceeded($memoryLimit)
+ public function memoryExceeded(int $memoryLimit)
{
return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
}
@@ -251,7 +249,7 @@ public function memoryExceeded($memoryLimit)
*
* @param int $status
*/
- public function stop($status = 0)
+ public function stop(int $status = 0)
{
$this->dispatcher->dispatch(self::EVENT_STOP, new WorkerStoppingEvent());
@@ -264,18 +262,18 @@ public function stop($status = 0)
* This will likely be because the job previously exceeded a timeout.
*
* @param string $connectionName
- * @param \Illuminate\Contracts\Queue\Job $job
+ * @param JobContractInterface $job
* @param int $maxTries
*
* @return void
*/
- protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
+ protected function markJobAsFailedIfAlreadyExceedsMaxAttempts(string $connectionName, JobContractInterface $job, int $maxTries)
{
$maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
$timeoutAt = $job->timeoutAt();
- if ($timeoutAt && Carbon::now()->getTimestamp() <= $timeoutAt) {
+ if ($timeoutAt && time() <= $timeoutAt) {
return;
}
@@ -294,10 +292,10 @@ protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $
* Mark the given job as failed and raise the relevant event.
*
* @param string $connectionName
- * @param \Illuminate\Contracts\Queue\Job $job
- * @param \Exception $e
+ * @param JobContractInterface $job
+ * @param Exception $e
*/
- protected function failJob($connectionName, $job, $e)
+ protected function failJob(string $connectionName, JobContractInterface $job, Exception $e)
{
if ($job->isDeleted()) {
return;
@@ -320,15 +318,15 @@ protected function failJob($connectionName, $job, $e)
* Handle an exception that occurred while the job was running.
*
* @param string $connectionName
- * @param \Illuminate\Contracts\Queue\Job $job
+ * @param JobContractInterface $job
* @param Options $options
- * @param \Exception $e
+ * @param Exception $e
*
* @return void
*
- * @throws \Exception
+ * @throws Exception
*/
- protected function handleJobException($connectionName, $job, Options $options, $e)
+ protected function handleJobException(string $connectionName, JobContractInterface $job, Options $options, Exception $e)
{
try {
// First, we will go ahead and mark the job as failed if it will exceed the maximum
@@ -359,17 +357,17 @@ protected function handleJobException($connectionName, $job, Options $options, $
* Mark the given job as failed if it has exceeded the maximum allowed attempts.
*
* @param string $connectionName
- * @param \Illuminate\Contracts\Queue\Job $job
+ * @param JobContractInterface $job
* @param int $maxTries
- * @param \Exception $e
+ * @param Exception $e
*
* @return void
*/
- protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e)
+ protected function markJobAsFailedIfWillExceedMaxAttempts(string $connectionName, JobContractInterface $job, int $maxTries, Exception $e)
{
$maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
- if ($job->timeoutAt() && $job->timeoutAt() <= Carbon::now()->getTimestamp()) {
+ if ($job->timeoutAt() && $job->timeoutAt() <= time()) {
$this->failJob($connectionName, $job, $e);
}
@@ -381,12 +379,12 @@ protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job,
/**
* Get the next job from the queue connection.
*
- * @param \Illuminate\Contracts\Queue\Queue $connection
+ * @param QueueInterface $connection
* @param string $queue
*
- * @return \Illuminate\Contracts\Queue\Job|null
+ * @return JobContractInterface|null
*/
- protected function getNextJob($connection, $queue)
+ protected function getNextJob(QueueInterface $connection, string $queue): ?JobContractInterface
{
try {
foreach (explode(',', $queue) as $queue) {
@@ -397,17 +395,19 @@ protected function getNextJob($connection, $queue)
} catch (Exception $e) {
$this->exceptions->report($e);
} catch (Throwable $e) {
- $this->exceptions->report($e = new FatalThrowableError($e));
+ $this->exceptions->report($e = new FatalThrowableException($e));
}
+
+ return null;
}
/**
* Raise the before queue job event.
*
* @param string $connectionName
- * @param \Illuminate\Contracts\Queue\Job $job
+ * @param JobContractInterface $job
*/
- protected function raiseBeforeJobEvent($connectionName, $job)
+ protected function raiseBeforeJobEvent(string $connectionName, JobContractInterface $job)
{
$this->dispatcher->dispatch(self::EVENT_RAISE_AFTER_JOB, new JobProcessingEvent($connectionName, $job));
}
@@ -416,9 +416,9 @@ protected function raiseBeforeJobEvent($connectionName, $job)
* Raise the after queue job event.
*
* @param string $connectionName
- * @param \Illuminate\Contracts\Queue\Job $job
+ * @param JobContractInterface $job
*/
- protected function raiseAfterJobEvent($connectionName, $job)
+ protected function raiseAfterJobEvent(string $connectionName, JobContractInterface $job)
{
$this->dispatcher->dispatch(self::EVENT_RAISE_AFTER_JOB, new JobProcessedEvent($connectionName, $job));
}
@@ -427,10 +427,10 @@ protected function raiseAfterJobEvent($connectionName, $job)
* Raise the exception occurred queue job event.
*
* @param string $connectionName
- * @param \Illuminate\Contracts\Queue\Job $job
- * @param \Exception $e
+ * @param JobContractInterface $job
+ * @param Exception $e
*/
- protected function raiseExceptionOccurredJobEvent($connectionName, $job, $e)
+ protected function raiseExceptionOccurredJobEvent(string $connectionName, JobContractInterface $job, Exception $e)
{
$this->dispatcher->dispatch(self::EVENT_RAISE_EXCEPTION_OCCURED_JOB, new JobExceptionOccurredEvent($connectionName, $job, $e));
}
@@ -439,10 +439,10 @@ protected function raiseExceptionOccurredJobEvent($connectionName, $job, $e)
* Raise the failed queue job event.
*
* @param string $connectionName
- * @param \Illuminate\Contracts\Queue\Job $job
- * @param \Exception $e
+ * @param JobContractInterface $job
+ * @param Exception $e
*/
- protected function raiseFailedJobEvent($connectionName, $job, $e)
+ protected function raiseFailedJobEvent(string $connectionName, JobContractInterface $job, Exception $e)
{
$this->dispatcher->dispatch(self::EVENT_RAISE_FAILED_JOB, new JobFailedEvent($connectionName, $job, $e));
}
diff --git a/composer.json b/composer.json
index e28954d..52333d5 100644
--- a/composer.json
+++ b/composer.json
@@ -1,7 +1,7 @@
{
"name": "sfcod/jobqueue",
"type": "symfony-bundle",
- "description": "Illuminate queue adapter for Symfony",
+ "description": "Async queues for Symfony",
"license": "MIT",
"authors": [
{
@@ -9,19 +9,18 @@
"email": "aaorlov88@gmail.com"
},
{
- "name": "Virchenko Maksim",
- "email": "muslim1992@gmail.com",
- "role": "Developer"
+ "name": "Virchenko Maksim",
+ "email": "muslim1992@gmail.com",
+ "role": "Developer"
}
],
"require": {
"php": "^7.0",
"symfony/framework-bundle": "^4.0",
- "predis/predis": "^1.1",
+ "mongodb/mongodb": "^1.1",
"symfony/monolog-bundle": "^3.1",
"symfony/process": "^4.0",
- "symfony/dotenv": "^4.0",
- "illuminate/queue": "^5.6"
+ "symfony/dotenv": "^4.0"
},
"require-dev": {
"symfony/phpunit-bridge": "^4.0",