Skip to content
Michael Caldera edited this page May 11, 2015 · 17 revisions

Info

The bundle provides an interface for adding new adapters.

At the moment you can find the pheanstalk adapter (using Beanstalkd).

Installation

In composer.json:

"require": {
    "everlution/tube-bundle": "dev-master"
},
"repositories" : [
        {
            "type" : "vcs",
            "url" : "https://github.com/everlution/tube-bundle.git"
        }
    ],

In app/AppKernel.php:

public function registerBundles()
{
    $bundles = array(
        ...
        new Everlution\TubeBundle\EverlutionTubeBundle(),
        ...
    );
}

The configuration is completely optional, in app/config/config.yml:

everlution_tube:
    job_class: Everlution\TubeBundle\Model\Job # OPTIONAL: default value
    job_serializer: Everlution\TubeBundle\Serializer\DefaultJsonJobSerializer # OPTIONAL: default value

Services

/* @var $jobFactory \Everlution\TubeBundle\Factory\JobFactory */
$jobFactory = $this->container->get('everlution_tube.factory.job');

/* @var $jobSerializer \Everlution\TubeBundle\Serializer\JobSerializerInterface */
$jobSerializer = $this->container->get('everlution_tube.serializer.job');

Usage

How to create a new tube provider

namespace AppBundle\Tube;

use Everlution\TubeBundle\Tube\AbstractTube;
use Everlution\TubeBundle\Model\Interfaces\JobInterface;
use Everlution\TubeBundle\Adapter\AdapterInterface;
use Everlution\TubeBundle\Manager\ManagerInterface;

class FirstTube extends AbstractTube
{
    private $predis;

    private $mailer;

    public function __construct(
        AdapterInterface $adapter, 
        $tubeName, 
        ManagerInterface $manager,
        $eventDispatcher, 
        $predis,
        $mailer
    ) {
        parent::__construct($adapter, $tubeName, $manager, $eventDispatcher);
        $this->predis = $predis;
        $this->mailer = $mailer;
    }

    public function produceAll()
    {
        throw new \Exception('Nothing to produce');
        // this method is helpful when you can repopulate the queue from scratch
        // it's used but the command everlution_tube:produce_all
    }

    public function consumeOne(JobInterface $job)
    {
        // Here the logic for consuming the job
        // The job's life cycle is managed by the parent class
        $payload = $job->getPayload();

        if (!filter_var($payload['email'], FILTER_VALIDATE_EMAIL) === false) {
            throw new MyCustomException('Invalid email address');
        }

        $this
            ->mailer
            ->setFrom('[email protected]')
            ->addTo($payload['email'])
            ->setSubject('The subject')
            ->setBody('The body')
            ->send()
        ;
    }

    public function validateJob(JobInterface $job)
    {
        $payload = $job->getPayload();

        if (empty($payload['email'])) {
            throw new InvalidJobException($job);
        }
    }
}

Define the manager

namespace AppBundle\Tube;

use Everlution\TubeBundle\Manager\ManagerInterface;

class MyManager implements ManagerInterface
{
    private $predis;

    public function __construct(\Predis\Client $predis)
    {
        $this->predis = $predis;
    }

    private function getKey($tubeName)
    {
        return sprintf('tube:status:%s', $tubeName);
    }

    public function isEnabled($tubeName)
    {
        return (bool) $this
            ->predis
            ->get($this->getKey($tubeName))
        ;
    }

    public function disable($tubeName)
    {
        $this
            ->predis
            ->del($this->getKey($tubeName))
        ;
    }

    public function enable($tubeName)
    {
        $this
            ->predis
            ->set($this->getKey($tubeName), 1)
        ;
    }
}

Register the provider

services:
    # Define the tube adapter
    app.pheanstalk:
        class: Pheanstalk\Pheanstalk
        arguments:
            - 127.0.0.1
            - 11300
    app.tube.adapter.pheanstalk:
        class: Everlution\TubeBundle\Adapter\PheanstalkAdapter
        arguments:
            - @everlution_tube.serializer.job
            - @app.pheanstalk
    # Define the tube adapter
    app.predis:
        class: Predis\Client
        arguments: [...]
    app.tube.runner:
        class: AppBundle\Tube\MyRunner
        arguments:
            - @app.predis
    app.tube.first:
        class: AppBundle\Tube\FirstTube
        arguments:
            - @app.tube.adapter.pheanstalk
            - 'first'
            - @app.tube.runner
            - @event_dispatcher
        calls:
            - [ setPriority, [ 1024 ] ]
            - [ setDelay, [ 0 ] ]
            - [ setTtr, [ 0 ] ]
            - [ setMaxRetriesOnFailure, [ 3 ] ]
            - [ setDelayOnRetry, [ 0 ] ]
        tags:
            - { name: everlution_tube.tube }

How to produce a new job

$job = $this
    ->container
    ->get('everlution_tube.factory.job')
    ->create()
;

// we can override the priority/TTR/delay etc. defined in the tube provider
$job
    ->setPriority(1)
    ->setTtr(60)
    ->setPayload(array(
        'age'  => rand(18, 50),
        'year' => rand(2015, 2030),
    ))
;

$tube = $this
    ->getContainer()
    ->get('app.tube.first')
;

$tube->produce($job);

How to run the tube

app/console everlution_tube:run [--ttr[="..."]] [--extra[="..."]] [--jobs[="..."]] [tube-provider]

The script will keep terminating if the tube has been stopped.

How to start a tube

app/console everlution_tube:start [tube-provider]

How to stop a tube

app/console everlution_tube:stop [tube-provider]

How to get info on a tube provider

app/console everlution_tube:info

Listen to tube and jobs events