diff --git a/src/Oliverde8/Component/PhpEtl/ChainProcessor.php b/src/Oliverde8/Component/PhpEtl/ChainProcessor.php index 183a9f3..96ddf22 100644 --- a/src/Oliverde8/Component/PhpEtl/ChainProcessor.php +++ b/src/Oliverde8/Component/PhpEtl/ChainProcessor.php @@ -3,6 +3,7 @@ namespace Oliverde8\Component\PhpEtl; use Oliverde8\Component\PhpEtl\ChainOperation\ChainOperationInterface; +use Oliverde8\Component\PhpEtl\Exception\ChainOperationException; use Oliverde8\Component\PhpEtl\Item\ChainBreakItem; use Oliverde8\Component\PhpEtl\Item\DataItem; use Oliverde8\Component\PhpEtl\Item\GroupedItemInterface; @@ -40,9 +41,14 @@ public function __construct(array $chainLinks) * * @param \Iterator $items * @param $context + * @throws ChainOperationException */ public function process(\Iterator $items, $context) { + if (!isset($context['etl']['identifier'])) { + $context['etl']['identifier'] = ''; + } + $this->processItems($items, 0, $context); } @@ -54,15 +60,22 @@ public function process(\Iterator $items, $context) * @param array $context * * @return ItemInterface + * @throws ChainOperationException */ protected function processItems(\Iterator $items, $startAt, &$context) { + $identifierPrefix = $context['etl']['identifier']; + + $count = 1; foreach ($items as $item) { + $context['etl']['identifier'] = $identifierPrefix . $count++; + $dataItem = new DataItem($item); $this->processItem($dataItem, $startAt, $context); } $stopItem = new StopItem(); + $context['etl']['identifier'] = $identifierPrefix . 'STOP'; while ($this->processItem($stopItem, $startAt, $context) !== $stopItem); return $stopItem; @@ -72,17 +85,19 @@ protected function processItems(\Iterator $items, $startAt, &$context) * Process an item, with chains starting at. * * @param ItemInterface $item - * @param int $startAt - * @param array $context + * @param $startAt + * @param $context * - * @return ItemInterface + * @return mixed|ItemInterface|StopItem + * @throws ChainOperationException */ protected function processItem(ItemInterface $item, $startAt, &$context) { for ($chainNumber = $startAt; $chainNumber < count($this->chainLinks); $chainNumber++) { - $item = $this->chainLinks[$chainNumber]->process($item, $context); + $item = $this->processItemWithOperation($item, $chainNumber, $context); if ($item instanceof GroupedItemInterface) { + $context['etl']['identifier'] .= "chain link:" . $this->chainLinkNames[$chainNumber] . "-"; $this->processItems($item->getIterator(), $chainNumber + 1, $context); return new StopItem(); @@ -93,4 +108,31 @@ protected function processItem(ItemInterface $item, $startAt, &$context) return $item; } + + /** + * Process an item and handle errors during the process. + * + * @param $item + * @param $chainNumber + * @param $context + * + * + * @return ItemInterface + * @throws ChainOperationException + */ + protected function processItemWithOperation($item, $chainNumber, &$context) + { + try { + return $this->chainLinks[$chainNumber]->process($item, $context); + } catch (\Exception $exception) { + throw new ChainOperationException( + "An exception was thrown during the handling of the chain link : " + . "{$this->chainLinkNames[$chainNumber]} " + . "with the item {$context['etl']['identifier']}.", + 0, + $exception, + $this->chainLinkNames[$chainNumber] + ); + } + } } diff --git a/src/Oliverde8/Component/PhpEtl/Exception/ChainOperationException.php b/src/Oliverde8/Component/PhpEtl/Exception/ChainOperationException.php new file mode 100644 index 0000000..d411d7c --- /dev/null +++ b/src/Oliverde8/Component/PhpEtl/Exception/ChainOperationException.php @@ -0,0 +1,40 @@ + + * @copyright 2018 Oliverde8 + * @package Oliverde8\Component\PhpEtl + */ +class ChainOperationException extends \Exception +{ + /** @var string */ + protected $chainOperationName; + + /** + * ChainOperationException constructor. + * + * @param string $message + * @param int $code + * @param Throwable|null $previous + * @param string $chainOperationName + */ + public function __construct($message = "", $code = 0, \Exception $previous = null, $chainOperationName = '') + { + $this->chainOperationName = $chainOperationName; + + parent::__construct($message, $code, $previous); + } + + /** + * @return string + */ + public function getChainOperationName() + { + return $this->chainOperationName; + } +} \ No newline at end of file diff --git a/src/Oliverde8/Component/PhpEtl/Tests/ChainProcessorTest.php b/src/Oliverde8/Component/PhpEtl/Tests/ChainProcessorTest.php index b959ffe..ed8e441 100644 --- a/src/Oliverde8/Component/PhpEtl/Tests/ChainProcessorTest.php +++ b/src/Oliverde8/Component/PhpEtl/Tests/ChainProcessorTest.php @@ -11,6 +11,7 @@ use Oliverde8\Component\PhpEtl\ChainOperation\Grouping\SimpleGroupingOperation; use Oliverde8\Component\PhpEtl\ChainOperation\Transformer\CallbackTransformerOperation; use Oliverde8\Component\PhpEtl\ChainProcessor; +use Oliverde8\Component\PhpEtl\Exception\ChainOperationException; use Oliverde8\Component\PhpEtl\Item\ChainBreakItem; use Oliverde8\Component\PhpEtl\Item\DataItem; use Oliverde8\Component\PhpEtl\Item\ItemInterface; @@ -154,4 +155,20 @@ public function testDoubleGrouping() $this->assertEquals(1, $count3); } + public function testException() + { + $mock1 = new CallbackTransformerOperation(function (ItemInterface $item) use (&$count1) { + throw new \Exception('Test exception'); + }); + + try { + $chainProcessor = new ChainProcessor(["op1" => $mock1]); + $chainProcessor->process(new \ArrayIterator(['test']), ['toto']); + } catch (ChainOperationException $exception) { + $this->assertEquals('op1', $exception->getChainOperationName()); + $this->assertContains('1', $exception->getMessage()); + } + } + + } diff --git a/src/Oliverde8/Component/PhpEtl/Tests/Item/Test.php b/src/Oliverde8/Component/PhpEtl/Tests/Item/Test.php new file mode 100644 index 0000000..d1f68e1 --- /dev/null +++ b/src/Oliverde8/Component/PhpEtl/Tests/Item/Test.php @@ -0,0 +1,30 @@ + + * @copyright 2018 Smile + */ + +namespace Oliverde8\Component\PhpEtl\Tests\Item; + + +use Oliverde8\Component\PhpEtl\Item\ChainBreakItem; +use Oliverde8\Component\PhpEtl\Item\DataItemInterface; +use Oliverde8\Component\PhpEtl\Item\GroupedItem; + +class GetMethodTest extends \PHPUnit_Framework_TestCase +{ + + public function testGroupedItem() + { + $groupedItem = new GroupedItem(new \ArrayIterator([])); + $this->assertEquals(DataItemInterface::SIGNAL_DATA, $groupedItem->getMethod()); + } + + public function testChainBreakItem() + { + $groupedItem = new ChainBreakItem(); + $this->assertEquals('chainBreak', $groupedItem->getMethod()); + } +}