Skip to content

Commit

Permalink
[beam-core] Add more logging when creating observers
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Mar 3, 2025
1 parent 0f2d91b commit 05ac176
Showing 1 changed file with 5 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ private void startObserve(@Nullable String name, Partition partition, OffsetRang
handle = observeBulkPartitions(name, restriction, reader, observer);
}
runningObserves.put(partition.getId(), handle);
log.info("Started observer for partition {}", partition);
}

abstract ObserveHandle observeBulkOffsets(
Expand All @@ -483,7 +484,10 @@ public void setup() {
.<Integer, ObserveHandle>removalListener(
notification -> {
if (notification.wasEvicted()) {
log.info("Closing observer {} due to expiry", notification.getKey());
log.info(
"Closing observer {} due to expiry {}",
notification.getKey(),
notification.getCause());
Optional.ofNullable(observers.remove(notification.getKey()))
.ifPresent(o -> o.stop(true));
notification.getValue().close();
Expand Down

0 comments on commit 05ac176

Please sign in to comment.