Synopsis
Installation, configuration end use (through a command line) of beanstalkd with Symfony 2.5+
1) Introduction
Beanstalk is a simple, fast work queue.
Its interface is generic, but was originally designed for reducing the latency of page views in high-volume web applications by running time-consuming tasks asynchronously.
What I like with beanstalk : simple and highly efficient queue, easy to make a cluster of “processors”.
Where I use it: when I need to “transmit” information to a specific set “processors” which I don’t know if it is online or not, and when I want the information to be processed only once (“Point-to-Point” processing).
For “Point-to-MultiPoint” processing, I use message broker, such as “Mosquitto” or RabbitMQ, or “Redis” (even if I prefer Redis in its core mode : fast storage of key/value data
2) Installation
I assume beanstalk is installed. Otherwise, see http://kr.github.io/beanstalkd/
With Symfony2.5+, I use leezy/pheanstalk. To install it, as always, use composer.
In composer.json, add
{ ...
"require": {
...
"leezy/pheanstalk-bundle": "2.*",
}
}
Then,
composer update
it will install pheanstalk then leezy/pheanstalk service for Symfony2.
3) Configuration
In app/config.php, configure the service. Here, for example, “server1” has a beanstalkd instance running.
# leezy pheanstalk
leezy_pheanstalk:
enabled: true
pheanstalks:
server1:
server: localhost
port: 11300
timeout: 60
See Leezy/Pheanstalk for more informations on installation and configuration
4) Example within a “Command”
<?php
namespace AB3\GlobalBundle\Command;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
class BeanstalkCommand extends ContainerAwareCommand {
const TUBE_NAME = "test"; // Tube to Watch
const WATCH_TIMEOUT = 60; // Watch Tube expires after 60s
// OutputInterface
protected $output;
protected function configure() {
$this ->setName('ab3:command:beanstalk')
->setDescription('Pull Job from Beanstalk Beanstalk');
;
}
protected function execute(InputInterface $input, OutputInterface $output) {
$this->output = $output;
$bsClient = $this->getContainer()->get("leezy.pheanstalk.server1");
while(true) { // loop until end
$job = $bsClient
->watch(self::TUBE_NAME)
->ignore('default')
->reserve(self::WATCH_TIMEOUT);
/** I do recommend to setup a timeout above. It avoids a tricky bug **/
if ($job) { // something to do
$this->processJobData($job->getData()); // process job data
$bsClient->delete($job); // remove job
}
}
}
protected function processJobData($data) {
$this->output->writeln($data);
}
}
Just couple of notes:
*) If job processing should take long, then you should remove the job before processing. So the
if (job) { ...}
becomes
if ($job) {
$jobData = $job->getData();
$bsClient->delete($job); // remove job
this->processJobData($jobData);
}
Why? Mostly to remove the lock and the job. See “What does DEADLINE_SOON mean?”for more details.
*) I always put a “Watch” timeout to avoid a possible freeze bug (I got it a year ago).
THANKS TO…. All contributors of Beanstalkd, Pheanstalk, Leezy/Pheanstalk