diff --git a/Commands/Monitor.php b/Commands/Monitor.php index adbd22c..25b633a 100644 --- a/Commands/Monitor.php +++ b/Commands/Monitor.php @@ -19,8 +19,9 @@ class Monitor extends ConsoleCommand protected function configure() { $this->setName('queuedtracking:monitor'); - $this->setDescription('Shows and updates the current state of the queue every 2 seconds.'); + $this->setDescription("Shows and updates the current state of the queue every 2 seconds.\n Key ,=first page, .=last page, 0-9=move to page section, arrow LEFT=prev page, RIGHT=next page, UP=next 10 pages, DOWN=prev 10 pages, q=quit"); $this->addRequiredValueOption('iterations', null, 'If set, will limit the number of monitoring iterations done.'); + $this->addRequiredValueOption('perpage', 'p', 'Number of queue worker displayed per page.', 16); } /** @@ -36,6 +37,9 @@ protected function doExecute(): int $systemCheck->checkRedisIsInstalled(); } + $output->write(str_repeat("\r\n", 100)); + $output->write("\e[".(100)."A"); + $iterations = $this->getIterationsFromArg(); if ($iterations !== null) { $output->writeln("Only running " . $iterations . " iterations."); @@ -58,34 +62,118 @@ protected function doExecute(): int $output->writeln('The command ./console queuedtracking:process has to be executed to process request sets within queue'); } - $output->writeln(sprintf('Up to %d workers will be used', $manager->getNumberOfAvailableQueues())); - $output->writeln(sprintf('Processor will start once there are at least %s request sets in the queue', + $output->writeln(sprintf('Up to %d workers will be used', $manager->getNumberOfAvailableQueues())); + $output->writeln(sprintf('Processor will start once there are at least %s request sets in the queue', $manager->getNumberOfRequestsToProcessAtSameTime())); $iterationCount = 0; + + $qCurrentPage = 1; + $qCount = count($queues); + $qPerPAge = min(max($this->getPerPageFromArg(), 1), $qCount); + $qPageCount = ceil($qCount / $qPerPAge); + + readline_callback_handler_install('', function() {}); + stream_set_blocking (STDIN, false); + + $output->writeln(str_repeat("-", 30)); + $output->writeln("".str_pad(" Q INDEX", 10).str_pad(" | REQUEST SETS", 20).""); + $output->writeln(str_repeat("-", 30)); + + $lastStatsTimer = microtime(true) - 2; + $lastSumInQueue = false; + $diffSumInQueue = 0; + $keyPressed = ""; + + $output->write(str_repeat("\r\n", $qPerPAge + 5)); while (1) { - $memory = $backend->getMemoryStats(); // I know this will only work with redis currently as it is not defined in backend interface etc. needs to be refactored once we add another backend + if (microtime(true) - $lastStatsTimer >= 2 || $keyPressed != "") + { + $output->write("\e[".($qPerPAge + 5)."A"); + + $qCurrentPage = min(max($qCurrentPage, 1), $qPageCount); + $memory = $backend->getMemoryStats(); // I know this will only work with redis currently as it is not defined in backend interface etc. needs to be refactored once we add another backend + + $sumInQueue = 0; + foreach ($queues as $sumQ) { + $sumInQueue += $sumQ->getNumberOfRequestSetsInQueue(); + } + + if ($lastSumInQueue !== false) { + $diffSumInQueue = $lastSumInQueue - $sumInQueue; + $diffRps = round($diffSumInQueue / (microtime(true) - $lastStatsTimer), 2); + $diffSumInQueue = $diffSumInQueue < 0 ? "".abs($diffRps)."" : "{$diffRps}"; + } + + $numInQueue = 0; + for ($idxPage = 0; $idxPage < $qPerPAge; $idxPage++) { + $idx = ($qCurrentPage - 1) * $qPerPAge + $idxPage; + if (isset($queues[$idx])) { + $q = $queues[$idx]->getNumberOfRequestSetsInQueue(); + $numInQueue += (int)$q; + $output->writeln(str_pad($idx, 10, " ", STR_PAD_LEFT)." | ".str_pad(number_format($q), 16, " ", STR_PAD_LEFT)); + } else { + $output->writeln(str_pad("", 10)." | ".str_pad("", 16)); + } + } + + $output->writeln(str_repeat("-", 30)); + $output->writeln("".str_pad(" ".($qCount)." Q", 10)." | ".str_pad(number_format($sumInQueue)." R", 16).""); + $output->writeln(str_repeat("-", 30)); + $output->writeln(sprintf( + "Q [%s-%s] | page %s/%s | press (0-9.,q) or arrow(L,R,U,D) | diff/sec %s \n". + "%s used memory (%s peak). %d workers active.".str_repeat(" ", 15), + ($idx - $qPerPAge + 1), + $idx, $qCurrentPage, $qPageCount, $diffSumInQueue, + $memory['used_memory_human'] ?? 'Unknown', + $memory['used_memory_peak_human'] ?? 'Unknown', + $lock->getNumberOfAcquiredLocks() + )); + + if (!is_null($iterations)) { + $iterationCount += 1; + if ($iterationCount >= $iterations) { + break; + } + } - $numInQueue = array(); - foreach ($queues as $queue) { - $numInQueue[] = $queue->getNumberOfRequestSetsInQueue(); + $lastSumInQueue = $sumInQueue; + $lastStatsTimer = microtime(true); } - $message = sprintf('%s (%s) request sets left in queue. %s used memory (%s peak). %d workers active. ', - array_sum($numInQueue), - implode('+', $numInQueue), - $memory['used_memory_human'] ?? 'Unknown', - $memory['used_memory_peak_human'] ?? 'unknown', - $lock->getNumberOfAcquiredLocks()); - $output->write("\x0D"); - $output->write($message); - if (!is_null($iterations)) { - $iterationCount += 1; - if ($iterationCount >= $iterations) { - break; + $keyStroke = stream_get_contents(STDIN, 3); + $keyPressed = strlen($keyStroke) == 3 ? $keyStroke[2] : (strlen($keyStroke) > 0 ? $keyStroke[0] : ""); + if ($keyPressed != "" and in_array($keyPressed, array(".", ",", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "q"))) { + switch ($keyPressed) { + case "0": case "1": case "2": case "3": case "4": + case "5": case "6": case "7": case "8": case "9": + $keyPressed = $keyPressed != "0" ? $keyPressed : "10"; + $qCurrentPage = floor(($qCurrentPage - 0.1) / 10) * 10 + (int)$keyPressed; break; + case "C": + $qCurrentPage++; + break; + case "D": + $qCurrentPage--; + break; + case "A": + $qCurrentPage += 10; + break; + case "B": + $qCurrentPage -= 10; + break; + case ",": + $qCurrentPage = 1; + break; + case ".": + $qCurrentPage = $qPageCount; + break; + case "q": + $output->writeln(''); + die; } } - sleep(2); + + usleep(5000); } return self::SUCCESS; @@ -112,4 +200,22 @@ private function getIterationsFromArg() return $iterations; } + /** + * Loads the `perpage` argument from the commands arguments. + * + * @return int|null + */ + private function getPerPageFromArg() + { + $perPage = $this->getInput()->getOption('perpage'); + if (!is_numeric($perPage)) { + throw new \Exception('perpage needs to be numeric'); + } else { + $perPage = (int)$perPage; + if ($perPage <= 0) { + throw new \Exception('perpage needs to be a non-zero positive number'); + } + } + return $perPage; + } } diff --git a/Commands/Process.php b/Commands/Process.php index 27e5c4d..a913561 100644 --- a/Commands/Process.php +++ b/Commands/Process.php @@ -26,6 +26,9 @@ protected function configure() $this->setName('queuedtracking:process'); $this->addRequiredValueOption('queue-id', null, 'If set, will only work on that specific queue. For example "0" or "1" (if there are multiple queues). Not recommended when only one worker is in use. If for example 4 workers are in use, you may want to use 0, 1, 2, or 3.'); $this->addRequiredValueOption('force-num-requests-process-at-once', null, 'If defined, it overwrites the setting of how many requests will be picked out of the queue and processed at once. Must be a number which is >= 1. By default, the configured value from the settings will be used. This can be useful for example if you want to process every single request within the queue. If otherwise a batch size of say 100 is configured, then there may be otherwise 99 requests left in the queue. It can be also useful for testing purposes.'); + $this->addRequiredValueOption('cycle', 'c', 'The proccess will automatically loop for "n" cycle time(s), set "0" to infinite.', 1); + $this->addRequiredValueOption('sleep', 's', 'Take a nap for "n" second(s) before recycle, minimum is 1 second.', 1); + $this->addRequiredValueOption('delay', 'd', 'Delay before finished', 0); $this->setDescription('Processes all queued tracking requests in case there are enough requests in the queue and in case they are not already in process by another script. To keep track of the queue use the --verbose option or execute the queuedtracking:monitor command.'); } @@ -76,29 +79,84 @@ protected function doExecute(): int throw new \Exception('Number of requests to process must be a number and at least 1'); } - $output->writeln("Starting to process request sets, this can take a while"); - register_shutdown_function(function () use ($queueManager) { $queueManager->unlock(); }); - $startTime = microtime(true); - $processor = new Processor($queueManager); - $processor->setNumberOfMaxBatchesToProcess(500); - $tracker = $processor->process(); - $neededTime = (microtime(true) - $startTime); - $numRequestsTracked = $tracker->getCountOfLoggedRequests(); - $requestsPerSecond = $this->getNumberOfRequestsPerSecond($numRequestsTracked, $neededTime); - Piwik::postEvent('Tracker.end'); + $numberOfProcessCycle = $input->getOption('cycle'); + if (!is_numeric($numberOfProcessCycle)) { + throw new \Exception('"cycle" needs to be numeric'); + } + $numberOfProcessCycle = (int)$numberOfProcessCycle; + $infiniteCycle = $numberOfProcessCycle == 0; + + $delayedBeforeFinish = (int)$input->getOption('delay'); + + $napster = max(1, $input->getOption('sleep')); + if (!is_numeric($napster)) { + throw new \Exception('"nap" needs to be numeric'); + } + $napster = (int)$napster; + + $lastTimeGotMoreThanZeroTrackedReq = microtime(true); + $originalNumberOfRequestsToProcessAtSameTime = $queueManager->getNumberOfRequestsToProcessAtSameTime(); + + while ($numberOfProcessCycle > 0 || $infiniteCycle) { + $wipingOutQueue = false; + if (microtime(true) - $lastTimeGotMoreThanZeroTrackedReq > 10) { + $queueManager->setNumberOfRequestsToProcessAtSameTime(1); + $wipingOutQueue = true; + $lastTimeGotMoreThanZeroTrackedReq = microtime(true); + } + + if ($wipingOutQueue) { + $output->writeln(" TRYING TO WIPE OUT THE QUEUE "); + } + $output->writeln("Starting to process request sets, this can take a while"); + + $startTime = microtime(true); + $processor = new Processor($queueManager); + $processor->setNumberOfMaxBatchesToProcess(500); + $tracker = $processor->process(); + + $neededTime = (microtime(true) - $startTime); + $numRequestsTracked = $tracker->getCountOfLoggedRequests(); + $requestsPerSecond = $this->getNumberOfRequestsPerSecond($numRequestsTracked, $neededTime); + + $this->writeSuccessMessage( + array(sprintf('This worker finished queue processing with %sreq/s (%s requests in %02.2f seconds)', $requestsPerSecond, $numRequestsTracked, $neededTime)) + ); + Piwik::postEvent('Tracker.end'); + + if ($numRequestsTracked > 0) { + $lastTimeGotMoreThanZeroTrackedReq = microtime(true); + } + + if (!$infiniteCycle) { + $numberOfProcessCycle--; + } + if ($numberOfProcessCycle > 0 || $infiniteCycle) { + $cTogo = $infiniteCycle ? "infinite" : $numberOfProcessCycle; + $output->writeln("==========================================================================="); + $output->writeln("Taking a nap for {$napster} second(s), before re-running the process. ({$cTogo}) cyle(s) to go."); + $output->writeln("==========================================================================="); + sleep($napster); + } + + if ($wipingOutQueue) { + $queueManager->setNumberOfRequestsToProcessAtSameTime($originalNumberOfRequestsToProcessAtSameTime); + } + } + // Piwik::postEvent('Tracker.end'); $trackerEnvironment->destroy(); - $this->writeSuccessMessage( - array(sprintf('This worker finished queue processing with %sreq/s (%s requests in %02.2f seconds)', $requestsPerSecond, $numRequestsTracked, $neededTime)) - ); - + if ($delayedBeforeFinish > 0) { + sleep($delayedBeforeFinish); + } + return self::SUCCESS; } diff --git a/docs/faq.md b/docs/faq.md index 2b657cf..b624c55 100644 --- a/docs/faq.md +++ b/docs/faq.md @@ -32,6 +32,7 @@ requests using the [Piwik console](http://developer.piwik.org/guides/piwik-on-th * Disable the setting "Process during tracking request" in the Piwik UI under "Settings => Plugin Settings" * Setup a cronjob that executes the command `./console queuedtracking:process` for instance every minute * That's it +* Or, if you have __"non WINDOWS OS"__ you can use the [Supervisor](http://supervisord.org/) as a cron alternative. The `queuedtracking:process` command will make sure to process all queued tracking requests whenever possible and the command will exit as soon as there are not enough requests queued anymore. That's why you should setup a cronjob to start @@ -43,6 +44,28 @@ Example crontab entry that starts the processor every minute: `* * * * * cd /piwik && ./console queuedtracking:process >/dev/null 2>&1` +Example Supervisor entry that will start 16 processors/workers with 10 loop cycle times and auto restart: + +```ini +[program:matomo] +directory=/path/to/your/matomo +command=/path/to/your/php /path/to/your/matomo/console queuedtracking:process --queue-id=%(process_num)s -c 10 -s 2 -d 5 +process_name=queuedtracking-%(process_num)s + +#change the number according to how many worker(s) you have +numprocs=16 + +numprocs_start=0 +stopsignal=TERM +autostart=true +autorestart=true +stopwaitsecs=120 +#priority=1000 +stdout_logfile=/dev/null +stdout_logfile_maxbytes=0 +redirect_stderr=true +``` + __Can I keep track of the state of the queue?__ Yes, you can. Just execute the command `./console queuedtracking:monitor`. This will show the current state of the queue. To exit this command you can for example press `CTRL + C` key at the same time.