Symfony 2 : Using Beanstalk with “Command”

Synopsis
Installation, configuration end use (through a command line) of beanstalkd with Symfony 2.5+

1) Introduction

Beanstalk is a simple, fast work queue.

Its interface is generic, but was originally designed for reducing the latency of page views in high-volume web applications by running time-consuming tasks asynchronously.

What I like with beanstalk : simple and highly efficient queue, easy to make a cluster of “processors”.

Where I use it: when I need to “transmit” information to a specific set “processors” which I don’t know if it is online or not, and when I want the information to be processed only once (“Point-to-Point” processing).

For “Point-to-MultiPoint” processing, I use message broker, such as “Mosquitto” or RabbitMQ, or “Redis” (even if I prefer Redis in its core mode : fast storage of key/value data

2) Installation

I assume beanstalk is installed. Otherwise, see http://kr.github.io/beanstalkd/

With Symfony2.5+, I use leezy/pheanstalk. To install it, as always, use composer.
In composer.json, add

{ ...
  "require": { 
    ...
    "leezy/pheanstalk-bundle": "2.*",
  }
}

Then,

composer update

it will install pheanstalk then leezy/pheanstalk service for Symfony2.
3) Configuration

In app/config.php, configure the service. Here, for example, “server1” has a beanstalkd instance running.

# leezy pheanstalk
leezy_pheanstalk:
    enabled: true
    pheanstalks:
        server1:
            server: localhost
            port: 11300
            timeout: 60

See Leezy/Pheanstalk for more informations on installation and configuration

4) Example within a “Command”

<?php
namespace AB3\GlobalBundle\Command;

use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;


class BeanstalkCommand extends ContainerAwareCommand {

    const TUBE_NAME = "test"; // Tube to Watch
    const WATCH_TIMEOUT = 60; // Watch Tube expires after 60s
    // OutputInterface
    protected $output;
    

    protected function configure() {
        $this   ->setName('ab3:command:beanstalk')
                ->setDescription('Pull Job from Beanstalk Beanstalk');
        ;
    }
    
    protected function execute(InputInterface $input, OutputInterface $output) {
        $this->output = $output;
        $bsClient = $this->getContainer()->get("leezy.pheanstalk.server1");
        while(true) { // loop until end
            $job = $bsClient
                ->watch(self::TUBE_NAME)
                ->ignore('default')
                ->reserve(self::WATCH_TIMEOUT); 
            /** I do recommend to setup a timeout above. It avoids a tricky bug **/
           if ($job) { // something to do
             $this->processJobData($job->getData()); // process job data
             $bsClient->delete($job); // remove job
           }
        }
   }
    
   protected function processJobData($data) {
     $this->output->writeln($data);
   }
 }

Just couple of notes:
*) If job processing should take long, then you should remove the job before processing. So the

if (job) { ...}

becomes

if ($job) {
  $jobData = $job->getData();
  $bsClient->delete($job); // remove job
  this->processJobData($jobData);
}

Why? Mostly to remove the lock and the job. See “What does DEADLINE_SOON mean?”for more details.
*) I always put a “Watch” timeout to avoid a possible freeze bug (I got it a year ago).

THANKS TO…. All contributors of Beanstalkd, Pheanstalk, Leezy/Pheanstalk