diff --git a/mantis-client/src/main/java/io/mantisrx/client/SinkClientImpl.java b/mantis-client/src/main/java/io/mantisrx/client/SinkClientImpl.java index a152ba8a9..0a5aed2d1 100644 --- a/mantis-client/src/main/java/io/mantisrx/client/SinkClientImpl.java +++ b/mantis-client/src/main/java/io/mantisrx/client/SinkClientImpl.java @@ -144,10 +144,10 @@ public Observable<T> call(EndpointChange endpointChange) { }) .doOnUnsubscribe(() -> { try { - logger.warn("Closing connections to sink of job " + jobId); + logger.warn("Closing connections to sink of job {}", jobId); closeAllConnections(); } catch (Exception e) { - Observable.error(e); + logger.warn("Error closing all connections to sink of job {}", jobId, e); } }) .share() @@ -212,7 +212,7 @@ public void call(Boolean flag) { } } return ((SinkConnection<T>) sinkConnection).call() - //.flatMap(o -> o) + .takeWhile(e -> !nowClosed.get()) ; }