diff --git a/src/main/scala/org/zalando/kanadi/api/Subscriptions.scala b/src/main/scala/org/zalando/kanadi/api/Subscriptions.scala index 89f58fb..4cb8a1f 100644 --- a/src/main/scala/org/zalando/kanadi/api/Subscriptions.scala +++ b/src/main/scala/org/zalando/kanadi/api/Subscriptions.scala @@ -1533,7 +1533,7 @@ case class Subscriptions(baseUri: URI, authTokenProvider: Option[AuthTokenProvid Subscriptions.ConnectionClosedCallback { connectionClosedData => if (!connectionClosedData.cancelledByClient) { logger.info(s"Server disconnected Nakadi stream, reconnecting in ${kanadiHttpConfig.serverDisconnectRetryDelay - .toString()}. Old StreamId: ${connectionClosedData.oldStreamId.id}, SubscriptionId: ${subscriptionId.id.toString}") + .toString()}. Old StreamId: ${connectionClosedData.oldStreamId.id}, SubscriptionId: ${subscriptionId.id.toString}") reconnect(subscriptionId, eventCallback, @@ -1555,7 +1555,7 @@ case class Subscriptions(baseUri: URI, authTokenProvider: Option[AuthTokenProvid streamId }.recoverWith { case _: Subscriptions.Errors.NoEmptySlotsOrCursorReset => logger.info(s"No empty slots/cursor reset, reconnecting in ${kanadiHttpConfig.noEmptySlotsCursorResetRetryDelay - .toString()}, SubscriptionId: ${subscriptionId.id.toString}") + .toString()}, SubscriptionId: ${subscriptionId.id.toString}") reconnect(subscriptionId, eventCallback, @@ -1586,7 +1586,7 @@ case class Subscriptions(baseUri: URI, authTokenProvider: Option[AuthTokenProvid nakadiSource }.recoverWith { case _: Subscriptions.Errors.NoEmptySlotsOrCursorReset => logger.info(s"No empty slots/cursor reset, reconnecting in ${kanadiHttpConfig.noEmptySlotsCursorResetRetryDelay - .toString()}, SubscriptionId: ${subscriptionId.id.toString}") + .toString()}, SubscriptionId: ${subscriptionId.id.toString}") org.apache.pekko.pattern.after(kanadiHttpConfig.noEmptySlotsCursorResetRetryDelay, http.system.scheduler)( eventsStreamedSourceManaged[T]( subscriptionId, diff --git a/src/test/scala/org/zalando/kanadi/BadJsonDecodingSpec.scala b/src/test/scala/org/zalando/kanadi/BadJsonDecodingSpec.scala index c077783..100dd66 100644 --- a/src/test/scala/org/zalando/kanadi/BadJsonDecodingSpec.scala +++ b/src/test/scala/org/zalando/kanadi/BadJsonDecodingSpec.scala @@ -178,7 +178,7 @@ class BadJsonDecodingSpec val future = for { closed <- closedFuture waitForClose <- waitForCloseFuture - } yield (closed | waitForClose) // either connection has been closed earlier or from our client side + } yield closed | waitForClose // either connection has been closed earlier or from our client side future.map(result => result mustEqual true) }