Package for running ETL tasks.
- Define custom Extractors, Transformers and Loaders
- Create pipeline to run specific data import/export process.
Tasks can be run:
- from CLI (bin/console etl:run <task_name>)
- from services or controllers
- from frontend using Server Sent Events (SSE) API.
- PHP 8.1+
- Symfony 6.2+
composer req "whitedigital-eu/etl-bundle"
Example task (HorizonDataExtractor, HorizonCustomerTransformer - should be created separately)
<?php
declare(strict_types=1);
namespace App\ETL\Task;
use App\ETL\Extractor\HorizonDataExtractor;
use App\ETL\Transformer\HorizonCustomerTransformer;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Mailer\Exception\TransportExceptionInterface;
use WhiteDigital\EtlBundle\Attribute\AsTask;
use WhiteDigital\EtlBundle\Exception\EtlException;
use WhiteDigital\EtlBundle\Loader\DoctrineDbalLoader;
use WhiteDigital\EtlBundle\Task\AbstractTask;
#[AsTask(name: 'horizon_customer_import')]
class HorizonCustomerImportTask extends AbstractTask
{
/**
* @throws EtlException
* @throws TransportExceptionInterface
*/
public function runTask(OutputInterface $output, array $extractorArgs = null): void
{
$this->etlPipeline
->setOutput($output)
->addExtractor(HorizonDataExtractor::class, $extractorArgs ?? ['path' => '/rest/TDdmNorSar/query?columns=K.KODS,K.NOSAUK&orderby=K.NOSAUK asc'])
->addTransformer(HorizonCustomerTransformer::class)
->addLoader(DoctrineDbalLoader::class)
->run();
}
}
Example Extractor:
<?php
declare(strict_types=1);
namespace App\ETL\Extractor;
use App\Service\HorizonRestApiService;
use Psr\Cache\InvalidArgumentException;
use Symfony\Contracts\HttpClient\Exception\ClientExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\RedirectionExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\ServerExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use WhiteDigital\EtlBundle\Exception\ExtractorException;
use WhiteDigital\EtlBundle\Extractor\AbstractExtractor;
use WhiteDigital\EtlBundle\Helper\Queue;
final class HorizonDataExtractor extends AbstractExtractor
{
public function __construct(
private readonly HorizonRestApiService $horizonRestApiService,
)
{
}
/**
* @param \Closure|null $batchProcessor
* @return Queue<\stdClass>
* @throws ClientExceptionInterface
* @throws ExtractorException
* @throws InvalidArgumentException
* @throws RedirectionExceptionInterface
* @throws ServerExceptionInterface
* @throws TransportExceptionInterface
* @throws \JsonException
*/
public function run(\Closure $batchProcessor = null): Queue
{
if (null !== $batchProcessor) {
throw new ExtractorException(sprintf('Batch mode not supported by %s', __CLASS__));
}
$data = new Queue();
$this->output->writeln(sprintf('Datu iegūšana uzsākta no avota: [%s]', $path = $this->getOption('path')));
$rawJsonData = $this->horizonRestApiService->makeGetRequest($path);
foreach ($rawJsonData?->collection->row as $row) {
$data->push($row);
}
$this->output->writeln(sprintf('Iegūti %s ieraksti.', count($data)));
return $data;
}
}
Example Transformer:
Example Loader:
- Run task by its name:
bin/console etl:run horizon_customer_import
or pass extra custom argument:
bin/console etl:run horizon_customer_import random_path.txt
- List available tasks:
bin/console etl:list