Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[ASTS] Telemetry part 2: Add placeholders to logs (#7381)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdaudali authored Oct 30, 2024
1 parent 4f05be4 commit 4314423
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ public void start() {
if (isStarted.compareAndSet(false, true)) {
scheduleNextIteration(automaticSweepRefreshDelay.get());
} else {
log.warn("Attempted to start an already started task", SafeArg.of("task", safeLoggableTaskName));
log.warn("Attempted to start an already started task: {}", SafeArg.of("task", safeLoggableTaskName));
}
}

private void runOneIteration() {
Duration delay = automaticSweepRefreshDelay.get();
try {
log.info("Running task", SafeArg.of("task", safeLoggableTaskName));
log.info("Running task: {}", SafeArg.of("task", safeLoggableTaskName));
task.run();
} catch (Exception e) {
log.warn(
"Failed to run task. Will retry in the next interval",
"Failed to run task {}. Will retry after delay {}",
SafeArg.of("task", safeLoggableTaskName),
SafeArg.of("delay", delay),
e);
Expand All @@ -101,7 +101,10 @@ private void runOneIteration() {
}

private void scheduleNextIteration(Duration delay) {
log.info("Scheduling next iteration", SafeArg.of("task", safeLoggableTaskName), SafeArg.of("delay", delay));
log.info(
"Scheduling next iteration for {} with delay {}",
SafeArg.of("task", safeLoggableTaskName),
SafeArg.of("delay", delay));
scheduledExecutorService.schedule(this::runOneIteration, delay.toMillis(), TimeUnit.MILLISECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ public void updateStartingBucketForShardAndStrategy(Bucket newStartingBucket) {
if (currentStartingBucket.isEmpty()) {
keyValueService.checkAndSet(CheckAndSetRequest.newCell(TABLE_REF, cell, serializedBucketProgress));
if (log.isDebugEnabled()) {
log.debug("Persisted new starting bucket", SafeArg.of("newStartingBucket", newStartingBucket));
log.debug(
"Persisted new starting bucket {}", SafeArg.of("newStartingBucket", newStartingBucket));
}
} else {
if (newStartingBucket.bucketIdentifier() > currentStartingBucket.get()) {
Expand All @@ -161,14 +162,15 @@ public void updateStartingBucketForShardAndStrategy(Bucket newStartingBucket) {
bucketIdentifierPersister.trySerialize(newStartingBucket.bucketIdentifier())));
if (log.isDebugEnabled()) {
log.debug(
"Updated sweep bucket progress",
"Updated sweep bucket progress from {} to {}",
SafeArg.of("previousStartingBucket", currentStartingBucket),
SafeArg.of("newStartingBucket", newStartingBucket));
}
} else {
log.info(
"Attempted to update starting bucket, but the existing starting bucket in the database"
+ " was already ahead of us (possible timelock lost lock?)",
"Attempted to update starting bucket from {} to {}, but the existing starting"
+ " bucket in the database was already ahead of us"
+ " (possible timelock lost lock?)",
SafeArg.of("previousStartingBucket", currentStartingBucket),
SafeArg.of("newStartingBucket", newStartingBucket));
}
Expand All @@ -177,15 +179,16 @@ public void updateStartingBucketForShardAndStrategy(Bucket newStartingBucket) {
} catch (RuntimeException e) {
if (attempt == CAS_ATTEMPT_LIMIT) {
log.warn(
"Repeatedly failed to update starting bucket as part of sweep; throwing, some work may"
+ " need to be re-done.",
"Repeatedly failed to update starting bucket to {} as part of sweep; throwing"
+ " after {} attempts, some work may need to be re-done",
SafeArg.of("attemptedNewStartingBucket", newStartingBucket),
SafeArg.of("numAttempts", CAS_ATTEMPT_LIMIT),
e);
throw e;
} else {
log.info(
"Failed to read or update starting bucket as part of sweep. Retrying",
"Failed to read or update starting bucket to {} as part of sweep after attempt {}."
+ " Retrying",
SafeArg.of("newStartingBucket", newStartingBucket),
SafeArg.of("attemptNumber", attempt),
e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void updateBucketProgressToAtLeast(Bucket bucket, BucketProgress minimum)
CheckAndSetRequest.newCell(TABLE_REF, bucketCell, serializedBucketProgress));
if (log.isDebugEnabled()) {
log.debug(
"Persisted new sweep bucket progress",
"Persisted new sweep bucket {} progress to {}",
SafeArg.of("bucket", bucket),
SafeArg.of("minimumProgress", minimum));
}
Expand All @@ -88,34 +88,35 @@ public void updateBucketProgressToAtLeast(Bucket bucket, BucketProgress minimum)
bucketProgressPersister.serializeProgress(minimum)));
if (log.isDebugEnabled()) {
log.debug(
"Updated sweep bucket progress",
"Updated sweep bucket {} progress from {} to {}",
SafeArg.of("bucket", bucket),
SafeArg.of("minimumProgress", minimum),
SafeArg.of("previousPersistedProgress", extantCurrentProgress));
SafeArg.of("previousPersistedProgress", extantCurrentProgress),
SafeArg.of("minimumProgress", minimum));
}
} else {
log.info(
"Attempted to update sweep bucket progress, but the existing progress in the database"
+ " was already ahead of us (possible timelock lost lock?)",
"Attempted to update sweep bucket {} progress, but the existing progress {}"
+ " in the database was already ahead of us {} (possible timelock lost lock?)",
SafeArg.of("bucket", bucket),
SafeArg.of("minimumProgress", minimum),
SafeArg.of("persistedProgress", extantCurrentProgress));
SafeArg.of("persistedProgress", extantCurrentProgress),
SafeArg.of("minimumProgress", minimum));
}
}
return;
} catch (RuntimeException e) {
if (attempt == CAS_ATTEMPT_LIMIT - 1) {
log.warn(
"Repeatedly failed to update bucket progress as part of sweep; throwing, some work may"
+ " need to be re-done.",
"Repeatedly failed to update bucket {} progress to {} as part of sweep; throwing"
+ " after {} attempts, some work may need to be re-done",
SafeArg.of("bucket", bucket),
SafeArg.of("minimumProgress", minimum),
SafeArg.of("numAttempts", CAS_ATTEMPT_LIMIT),
e);
throw e;
} else {
log.info(
"Failed to read or update bucket progress as part of sweep. Retrying",
"Failed to read or update bucket {} progress {} as part of sweep after attempt {}."
+ " Retrying",
SafeArg.of("bucket", bucket),
SafeArg.of("minimumProgress", minimum),
SafeArg.of("attemptNumber", attempt + 1),
Expand Down

0 comments on commit 4314423

Please sign in to comment.