Skip to content

Commit

Permalink
Perserve correleation_id when not empty in received message
Browse files Browse the repository at this point in the history
  • Loading branch information
ktogias committed Mar 16, 2022
1 parent 98575b2 commit 96c4d92
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 5 deletions.
2 changes: 1 addition & 1 deletion dev/Application/Messaging/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public function getHeader(string $name, mixed $default = null):mixed;

public function getProperties():array;

public function getPropery(string $name, mixed $default = null):mixed;
public function getProperty(string $name, mixed $default = null):mixed;

public function getKey():?string;

Expand Down
2 changes: 1 addition & 1 deletion dev/Application/Messaging/Plugin/ExampleFilter2.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ public function matches(Message $message): bool
{
return
$message->getHeader('name') == 'MySecondEventName'
&& $message->getPropery('type') == 'MySecondEventType';
&& $message->getProperty('type') == 'MySecondEventType';
}
}
4 changes: 2 additions & 2 deletions dev/Infrastructure/Event/Adapter/Pdo/Mapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ public function map(Message $message, string $channel): array
return [
':name' => $message->getHeader('name'),
':channel' => $channel,
':correlation_id' => $message->getPropery('id'),
':correlation_id' => $message->getProperty('correlation_id')?$message->getProperty('correlation_id'):$message->getProperty('id'),
':aggregate_id' => $message->getHeader('aggregate_id'),
':aggregate_version' => $message->getHeader('aggregate_version'),
':data' => $message->getBody(),
':timestamp' => $message->getPropery('timestamp')
':timestamp' => $message->getProperty('timestamp')
];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public function getProperties():array{
return $this->delegate->getProperties();
}

public function getPropery(string $name, mixed $default = null):mixed{
public function getProperty(string $name, mixed $default = null):mixed{
return $this->delegate->getProperty(name: $name, default: $default);
}

Expand Down
5 changes: 5 additions & 0 deletions dev/tests/features/ValidMessages.feature
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,8 @@ Scenario: Insert a valid message from an unfilterred and untranslated channel
Given The channel is set
When listener encounters an valid message
Then it should insert it in db

Scenario: Insert a valid message from an unfilterred and untranslated channel
Given The channel is set
When listener encounters an valid message with correlation id
Then it should insert it in db with correlation id
42 changes: 42 additions & 0 deletions dev/tests/features/bootstrap/ValidMessageContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,46 @@ public function itShouldInsertItInDb()
Assert::that($event['aggregate_version'])->eq(7);
Assert::that($event['channel'])->eq($this->channelWithNoFilterNoTranslator);
}

/**
* @When listener encounters an valid message with correlation id
*/
public function listenerEncountersAnValidMessageWithCorrelationId()
{
$topic = self::$kafkaContext->createTopic($this->channelWithNoFilterNoTranslator);
$producer = self::$kafkaContext->createProducer();
$producer->send($topic, self::$kafkaContext->createMessage('{"attr1": "val1", "attr2": "val2"}',[
'id' => 123,
'correlation_id' => 456,
'timestamp' => '2022-01-28 12:23:56'
],[
'name' => 'eventName',
'aggregate_id' => 23,
'aggregate_version' => 7
]));
}

/**
* @Then it should insert it in db with correlation id
*/
public function itShouldInsertItInDbWithCorrelationId()
{
$event = null;
$count = 0;
while (!$event && $count<60){
$con = new PDO("pgsql:host=".getenv('STORE_DB_HOST').";dbname=".getenv('STORE_DB_NAME'), getenv('STORE_DB_USER'), getenv('STORE_DB_PASSWORD'));
$stmt = $con->prepare('SELECT * FROM event WHERE "correlation_id" = :correlation_id');
$stmt->execute([':correlation_id' => 456]);
$event = $stmt->fetch();
sleep(1);
$count++;
}
Assert::that($event)->notEmpty();
Assert::that($event['correlation_id'])->eq(456);
Assert::that($event['timestamp'])->eq('2022-01-28 12:23:56');
Assert::that($event['name'])->eq('eventName');
Assert::that($event['aggregate_id'])->eq(23);
Assert::that($event['aggregate_version'])->eq(7);
Assert::that($event['channel'])->eq($this->channelWithNoFilterNoTranslator);
}
}

0 comments on commit 96c4d92

Please sign in to comment.