-
Notifications
You must be signed in to change notification settings - Fork 0
Home
Michael Caldera edited this page May 11, 2015
·
17 revisions
The bundle provides an interface for adding new adapters.
At the moment you can find the pheanstalk adapter (using Beanstalkd).
In composer.json
:
"require": {
"everlution/tube-bundle": "dev-master"
},
"repositories" : [
{
"type" : "vcs",
"url" : "https://github.com/everlution/tube-bundle.git"
}
],
In app/AppKernel.php
:
public function registerBundles()
{
$bundles = array(
...
new Everlution\TubeBundle\EverlutionTubeBundle(),
...
);
}
The configuration is completely optional, in app/config/config.yml
:
everlution_tube:
job_class: Everlution\TubeBundle\Model\Job # OPTIONAL: default value
job_serializer: Everlution\TubeBundle\Serializer\DefaultJsonJobSerializer # OPTIONAL: default value
/* @var $jobFactory \Everlution\TubeBundle\Factory\JobFactory */
$jobFactory = $this->container->get('everlution_tube.factory.job');
/* @var $jobSerializer \Everlution\TubeBundle\Serializer\JobSerializerInterface */
$jobSerializer = $this->container->get('everlution_tube.serializer.job');
namespace AppBundle\Tube;
use Everlution\TubeBundle\Tube\AbstractTube;
use Everlution\TubeBundle\Model\Interfaces\JobInterface;
use Everlution\TubeBundle\Adapter\AdapterInterface;
use Everlution\TubeBundle\Manager\ManagerInterface;
class FirstTube extends AbstractTube
{
private $predis;
private $mailer;
public function __construct(
AdapterInterface $adapter,
$tubeName,
ManagerInterface $manager,
$eventDispatcher,
$predis,
$mailer
) {
parent::__construct($adapter, $tubeName, $manager, $eventDispatcher);
$this->predis = $predis;
$this->mailer = $mailer;
}
public function produceAll()
{
throw new \Exception('Nothing to produce');
// this method is helpful when you can repopulate the queue from scratch
// it's used but the command everlution_tube:produce_all
}
public function consumeOne(JobInterface $job)
{
// Here the logic for consuming the job
// The job's life cycle is managed by the parent class
$payload = $job->getPayload();
if (!filter_var($payload['email'], FILTER_VALIDATE_EMAIL) === false) {
throw new MyCustomException('Invalid email address');
}
$this
->mailer
->setFrom('[email protected]')
->addTo($payload['email'])
->setSubject('The subject')
->setBody('The body')
->send()
;
}
public function validateJob(JobInterface $job)
{
$payload = $job->getPayload();
if (empty($payload['email'])) {
throw new InvalidJobException($job);
}
}
}
Define the manager
namespace AppBundle\Tube;
use Everlution\TubeBundle\Manager\ManagerInterface;
class MyManager implements ManagerInterface
{
private $predis;
public function __construct(\Predis\Client $predis)
{
$this->predis = $predis;
}
private function getKey($tubeName)
{
return sprintf('tube:status:%s', $tubeName);
}
public function isEnabled($tubeName)
{
return (bool) $this
->predis
->get($this->getKey($tubeName))
;
}
public function disable($tubeName)
{
$this
->predis
->del($this->getKey($tubeName))
;
}
public function enable($tubeName)
{
$this
->predis
->set($this->getKey($tubeName), 1)
;
}
}
Register the provider
services:
# Define the tube adapter
app.pheanstalk:
class: Pheanstalk\Pheanstalk
arguments:
- 127.0.0.1
- 11300
app.tube.adapter.pheanstalk:
class: Everlution\TubeBundle\Adapter\PheanstalkAdapter
arguments:
- @everlution_tube.serializer.job
- @app.pheanstalk
# Define the tube adapter
app.predis:
class: Predis\Client
arguments: [...]
app.tube.runner:
class: AppBundle\Tube\MyRunner
arguments:
- @app.predis
app.tube.first:
class: AppBundle\Tube\FirstTube
arguments:
- @app.tube.adapter.pheanstalk
- 'first'
- @app.tube.runner
- @event_dispatcher
calls:
- [ setPriority, [ 1024 ] ]
- [ setDelay, [ 0 ] ]
- [ setTtr, [ 0 ] ]
- [ setMaxRetriesOnFailure, [ 3 ] ]
- [ setDelayOnRetry, [ 0 ] ]
tags:
- { name: everlution_tube.tube }
$job = $this
->container
->get('everlution_tube.factory.job')
->create()
;
// we can override the priority/TTR/delay etc. defined in the tube provider
$job
->setPriority(1)
->setTtr(60)
->setPayload(array(
'age' => rand(18, 50),
'year' => rand(2015, 2030),
))
;
$tube = $this
->getContainer()
->get('app.tube.first')
;
$tube->produce($job);
app/console everlution_tube:run [--ttr[="..."]] [--extra[="..."]] [--jobs[="..."]] [tube-provider]
The script will keep terminating if the tube has been stopped.
app/console everlution_tube:start [tube-provider]
app/console everlution_tube:stop [tube-provider]
app/console everlution_tube:info