Skip to content

Commit

Permalink
healthcheck test watchdog
Browse files Browse the repository at this point in the history
  • Loading branch information
aynsix committed Jan 13, 2023
1 parent b4c6712 commit 202958a
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ public function __construct()
protected function doExecute(InputInterface $input, OutputInterface $output)
{
/** @var AMQPConnection $serverConnection */
$serverConnection = $this->container['alchemy_worker.amqp.connection'];
// $serverConnection = $this->container['alchemy_worker.amqp.connection'];

$connection = $serverConnection->getConnection();
// $connection = $serverConnection->getConnection();

$interval = $input->getOption('heartbeat');
if (empty($interval)) {
$interval = self::DEFAULT_INTERVAL;
}

$heartbeatHandler = new HeartbeatHandler($connection);
$heartbeatHandler = new HeartbeatHandler($this->container['alchemy_worker.message.publisher']);
$heartbeatHandler->run($interval);

return 0;
Expand Down
40 changes: 29 additions & 11 deletions lib/Alchemy/Phrasea/WorkerManager/Queue/HeartbeatHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,47 @@

class HeartbeatHandler
{
/**
* @var AbstractConnection
*/
private $connection;
private $messagePublisher;

public function __construct(AbstractConnection $connection)
public function __construct(MessagePublisher $messagePublisher)
{
$this->connection = $connection;
$this->messagePublisher = $messagePublisher;
}

/**
* @param int $interval
*/
public function run($interval)
{
$fileDir = $_SERVER['PWD'] .'/tmp/watchdog';

while (true) {
if (!$this->connection->isConnected()) {
return;
$filename = $_SERVER['PWD'] .'/tmp/watchdog/edit.watchdog';
if (!file_exists($filename)) {
if (!is_dir($fileDir)) {
mkdir($fileDir, 0775, true);
}

file_put_contents($filename, 'watchdog_edit_ping');

$payload = [
'message_type' => MessagePublisher::MAIN_QUEUE_TYPE,
'payload' => [
'type' => MessagePublisher::EDIT_RECORD_TYPE, // used to specify the final Q to publish message
'dataType' => 'watchdog',
'data' => 'watchdog_edit_ping'
]
];

$this->messagePublisher->publishMessage($payload, MessagePublisher::MAIN_QUEUE_TYPE);
} else {
$this->messagePublisher->pushLog("Edit record worker do not consume message! check if consumer is running ,busy , or to be restart!", "warning");

@unlink($_SERVER['PWD'] . '/tmp/watchdog/edit.watchdog'); // unlink, so to be able to re-check after
}

sleep((int) $interval / 2);

$this->connection->checkHeartBeat();
// each 5 minutes
sleep(300);
}
}
}
7 changes: 7 additions & 0 deletions lib/Alchemy/Phrasea/WorkerManager/Worker/EditRecordWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ public function __construct(WorkerRunningJobRepository $repoWorker, EventDispatc

public function process(array $payload)
{
if ($payload['dataType'] == 'watchdog') {
@unlink($_SERVER['PWD'] . '/tmp/watchdog/edit.watchdog');
$this->messagePublisher->pushLog("watchdog edit message processed!", 'info');

return 0;
}

try {
$databox = $this->findDataboxById($payload['databoxId']);
} catch(\Exception $e) {
Expand Down
4 changes: 3 additions & 1 deletion lib/Alchemy/Phrasea/WorkerManager/Worker/MainQueueWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public function process(array $payload)

return $singleMessage;
}, $payload['data']);
} else {
} elseif ($payload['dataType'] == RecordEditInWorkerEvent::JSON_TYPE) {
$data = json_decode($payload['data'], true);

$payloadData = array_map(function($singleMessage) use ($payload, $data) {
Expand All @@ -81,6 +81,8 @@ public function process(array $payload)

return $singleMessage;
}, $data['records']);
} elseif ($payload['dataType'] == 'watchdog') {
$payloadData[0] = $payload;
}

$childMessageCount = count($payloadData);
Expand Down

0 comments on commit 202958a

Please sign in to comment.