Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix AWS Limits Race-Condition #113

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 72 additions & 144 deletions src/Handler/CloudWatch.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,133 +13,79 @@ class CloudWatch extends AbstractProcessingHandler
/**
* Requests per second limit (https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html)
*/
const RPS_LIMIT = 5;
public const RPS_LIMIT = 5;

/**
* Event size limit (https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html)
*
* @var int
*/
const EVENT_SIZE_LIMIT = 262118; // 262144 - reserved 26
public const EVENT_SIZE_LIMIT = 262118; // 262144 - reserved 26

/**
* The batch of log events in a single PutLogEvents request cannot span more than 24 hours.
*
* @var int
*/
const TIMESPAN_LIMIT = 86400000;
public const TIMESPAN_LIMIT = 86400000;

/**
* @var CloudWatchLogsClient
*/
private $client;
private CloudWatchLogsClient $client;

/**
* @var string
*/
private $group;
private string $group;

/**
* @var string
*/
private $stream;

/**
* @var integer
*/
private $retention;

/**
* @var bool
*/
private $initialized = false;
private string $stream;

/**
* @var string
*/
private $sequenceToken;
private int $retention;

/**
* @var int
*/
private $batchSize;
private int $batchSize;

/**
* @var array
*/
private $buffer = [];
private array $buffer = [];

/**
* @var array
*/
private $tags = [];
private array $tags = [];

/**
* @var bool
*/
private $createGroup;
private bool $createGroup;

/**
* Data amount limit (http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html)
*
* @var int
*/
private $dataAmountLimit = 1048576;
private int $dataAmountLimit = 1048576;

/**
* @var int
*/
private $currentDataAmount = 0;
private int $currentDataAmount = 0;

/**
* @var int
*/
private $remainingRequests = self::RPS_LIMIT;
private int $remainingRequests = self::RPS_LIMIT;

/**
* @var \DateTime
*/
private $savedTime;
private \DateTime $savedTime;

/**
* @var int|null
*/
private $earliestTimestamp = null;
private ?int $earliestTimestamp = null;

private bool $initialized = false;
private bool $initializing = false;

/**
* CloudWatchLogs constructor.
* @param CloudWatchLogsClient $client
*
* Log group names must be unique within a region for an AWS account.
* Log group names can be between 1 and 512 characters long.
* Log group names consist of the following characters: a-z, A-Z, 0-9, '_' (underscore), '-' (hyphen),
* '/' (forward slash), and '.' (period).
*
* @param string $group
*
* Log stream names must be unique within the log group.
* Log stream names can be between 1 and 512 characters long.
* The ':' (colon) and '*' (asterisk) characters are not allowed.
* @param string $stream
*
* @param int $retention
* @param int $batchSize
* @param array $tags
* @param int $level
* @param bool $bubble
* @param bool $createGroup
*
* @throws \Exception
*/
public function __construct(
CloudWatchLogsClient $client,
$group,
$stream,
$retention = 14,
$batchSize = 10000,
string $group,
string $stream,
int $retention = 14,
int $batchSize = 10000,
array $tags = [],
$level = Logger::DEBUG,
$bubble = true,
$createGroup = true
bool $bubble = true,
bool $createGroup = true
) {
if ($batchSize > 10000) {
throw new \InvalidArgumentException('Batch size can not be greater than 10000');
Expand All @@ -152,20 +98,42 @@ public function __construct(
$this->batchSize = $batchSize;
$this->tags = $tags;
$this->createGroup = $createGroup;
$this->savedTime = new \DateTime();

parent::__construct($level, $bubble);
}

private function initialize(): void
{
$this->initializing = true;

$this->savedTime = new \DateTime;
if ($this->createGroup) {
$this->initializeGroup();
}
$this->initializeStream();

$this->initializing = false;
$this->initialized = true;
}

/**
* {@inheritdoc}
*/
protected function write(array $record): void
{
// initialize exactly once (messages during initialization are sent to buffer
if (!$this->initialized && !$this->initializing) {
$this->initialize();
}

$records = $this->formatRecords($record);

foreach ($records as $record) {
if ($this->initializing) {
$this->addToBuffer($record);
continue;
}

if ($this->willMessageSizeExceedLimit($record) || $this->willMessageTimestampExceedLimit($record)) {
$this->flushBuffer();
}
Expand Down Expand Up @@ -197,15 +165,11 @@ private function addToBuffer(array $record): void
private function flushBuffer(): void
{
if (!empty($this->buffer)) {
if (false === $this->initialized) {
$this->initialize();
}

// send items, retry once with a fresh sequence token
try {
$this->send($this->buffer);
} catch (\Aws\CloudWatchLogs\Exception\CloudWatchLogsException $e) {
$this->refreshSequenceToken();
$this->addToBuffer(['level' => 'EXCEPTION', 'message' => $e->getMessage()]);
$this->send($this->buffer);
}

Expand Down Expand Up @@ -239,22 +203,16 @@ private function checkThrottle(): void
}

/**
* http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
*
* @param array $record
* @return int
* @see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
*/
private function getMessageSize($record): int
private function getMessageSize(array $record): int
{
return strlen($record['message']) + 26;
}

/**
* Determine whether the specified record's message size in addition to the
* size of the current queued messages will exceed AWS CloudWatch's limit.
*
* @param array $record
* @return bool
*/
protected function willMessageSizeExceedLimit(array $record): bool
{
Expand All @@ -264,9 +222,6 @@ protected function willMessageSizeExceedLimit(array $record): bool
/**
* Determine whether the specified record's timestamp exceeds the 24 hour timespan limit
* for all batched messages written in a single call to PutLogEvents.
*
* @param array $record
* @return bool
*/
protected function willMessageTimestampExceedLimit(array $record): bool
{
Expand All @@ -276,9 +231,6 @@ protected function willMessageTimestampExceedLimit(array $record): bool
/**
* Event size in the batch can not be bigger than 256 KB
* https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html
*
* @param array $entry
* @return array
*/
private function formatRecords(array $entry): array
{
Expand Down Expand Up @@ -307,49 +259,43 @@ private function formatRecords(array $entry): array
* - The maximum number of log events in a batch is 10,000.
* - A batch of log events in a single request cannot span more than 24 hours. Otherwise, the operation fails.
*
* @param array $entries
*
* @throws \Aws\CloudWatchLogs\Exception\CloudWatchLogsException Thrown by putLogEvents for example in case of an
* invalid sequence token
*/
private function send(array $entries): void
{
// AWS expects to receive entries in chronological order...
usort($entries, static function (array $a, array $b) {
if ($a['timestamp'] < $b['timestamp']) {
return -1;
} elseif ($a['timestamp'] > $b['timestamp']) {
return 1;
}
usort(
$entries,
static function (array $a, array $b) {
if ($a['timestamp'] < $b['timestamp']) {
return -1;
} elseif ($a['timestamp'] > $b['timestamp']) {
return 1;
}

return 0;
});
return 0;
}
);

$data = [
'logGroupName' => $this->group,
'logStreamName' => $this->stream,
'logEvents' => $entries
];

if (!empty($this->sequenceToken)) {
$data['sequenceToken'] = $this->sequenceToken;
}

$this->checkThrottle();

$response = $this->client->putLogEvents($data);

$this->sequenceToken = $response->get('nextSequenceToken');
$this->client->putLogEvents($data);
}

private function initializeGroup(): void
{
// fetch existing groups
$existingGroups =
$this
->client
->describeLogGroups(['logGroupNamePrefix' => $this->group])
->get('logGroups');
->client
->describeLogGroups(['logGroupNamePrefix' => $this->group])
->get('logGroups');

// extract existing groups names
$existingGroupsNames = array_map(
Expand Down Expand Up @@ -384,37 +330,21 @@ function ($group) {
}
}

private function initialize(): void
private function initializeStream(): void
{
if ($this->createGroup) {
$this->initializeGroup();
}

$this->refreshSequenceToken();
}

private function refreshSequenceToken(): void
{
// fetch existing streams
$existingStreams =
$this
->client
->describeLogStreams(
[
->client
->describeLogStreams(
[
'logGroupName' => $this->group,
'logStreamNamePrefix' => $this->stream,
]
)->get('logStreams');
)->get('logStreams');

// extract existing streams names
$existingStreamsNames = array_map(
function ($stream) {

// set sequence token
if ($stream['logStreamName'] === $this->stream && isset($stream['uploadSequenceToken'])) {
$this->sequenceToken = $stream['uploadSequenceToken'];
}

return $stream['logStreamName'];
},
$existingStreams
Expand All @@ -431,8 +361,6 @@ function ($stream) {
]
);
}

$this->initialized = true;
}

/**
Expand Down