Phplumber is a very simple pipelining library for PHP which allows a mix of asynchronous and synchronous processes. Use it to break a large process into multiple steps using a queue and multi-processing. Phplumber contains no hard-coded dependencies and can be backed by any queue setup and any storage mechanism.
- PHP 5.3+
- A queue, such as RabbitMQ or Redis
- Storage, such as a relational database table or Redis
Let's take creating and filling a database as an example. It takes multiple steps and some can be done concurrently.
- Create database (one process)
- Create and populate tables (one process per table)
- Create views dependent on multiple tables (one process, dependent on all tables existing)
Create table 1
/ \
Create database -> Create table 2 -> Create views
\ /
Create table 3
First we would define our processes. Sequential steps extend Process
. A step that can run multiple times with
different data extends MultiProcess
.
class CreateDatabase extends Process
{
public function invoke($payload)
{
$database_name = $payload['database_name'];
echo "Drop database $database_name if it exists...\n";
echo "Creating database $database_name...\n";
}
}
class CreateTable extends MultiProcess
{
// Determine the data we need to queue for async processes
public function getAsyncPayloads($payload)
{
$database_name = $payload['database_name'];
$table_names = array('first_table', 'second_table', 'third_table');
$payloads = array();
foreach ($table_names as $table) {
$payloads[] = array('database_name' => $database_name, 'table_name' => $table);
}
return $payloads;
}
public function invoke($payload)
{
$database_name = $payload['database_name'];
$table_name = $payload['table'];
echo "Connecting to database $database_name...\n";
echo "Creating and populating $table_name...\n";
switch ($payload['table']) {
case 'first_table':
// Create table and insert rows...
break;
// ...
}
}
}
Then we define the sequence of processes.
class CreateAndFillDatabase extends ProcessList
{
protected function setup()
{
$this
->add('CreateDatabase')
->add('CreateTable')
->add('CreateViews');
}
}
We can now kick off the process sequence.
$equation = new CreateAndFillDatabase();
$equation->process(array('database_name' => 'test_db'));
See the examples
directory for complete, working demos.
- Implement
StorageInterface
. This will hold semaphore data. Appropriate storage engines include any relational database, nosql, or key-value stores such as Redis. - Extend the
Queue
class to integrate with a queue system. Appropriate queue engines include Redis and RabbitMQ. - Write each process as a class that extends
Process
(for synchronous) orMultiProcess
(for asynchronous). - Implement
ProcessFactoryInterface
. This will create each instance ofProcessInterface
, allowing you to set up your processes with any prerequisites, such as a database connection or configuration options. - Put the processes together by extending
ProcessList
. - Implement a worker daemon which instantiates your
Queue
implementation and callsconsume()
to listen for incoming messages. Each message is to invoke a single part of a multi-process. Run multiple workers to execute processes concurrently. - Choose a place in your system to start the entire workflow, instantiate your
ProcessList
, and callprocess()
, passing it the initial payload.