Skip to content

Commit

Permalink
SinkClient fix logging on unsubscribe (#144)
Browse files Browse the repository at this point in the history
* Add logging to SinkClient to track uncleaned connections

* Only count resubmission from worker failures

* SinkClient fix logging on unsubscribe

Co-authored-by: Calvin Cheung <[email protected]>
  • Loading branch information
calvin681 and calvin681 authored Mar 13, 2022
1 parent de88e88 commit 9f67db4
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ public Observable<T> call(EndpointChange endpointChange) {
})
.doOnUnsubscribe(() -> {
try {
logger.warn("Closing connections to sink of job " + jobId);
logger.warn("Closing connections to sink of job {}", jobId);
closeAllConnections();
} catch (Exception e) {
Observable.error(e);
logger.warn("Error closing all connections to sink of job {}", jobId, e);
}
})
.share()
Expand Down Expand Up @@ -212,7 +212,7 @@ public void call(Boolean flag) {
}
}
return ((SinkConnection<T>) sinkConnection).call()
//.flatMap(o -> o)
.takeWhile(e -> !nowClosed.get())
;
}

Expand Down

0 comments on commit 9f67db4

Please sign in to comment.