From 9414c99bdacf12df4db1649b523f4421c9e04637 Mon Sep 17 00:00:00 2001 From: Denire Date: Thu, 9 Dec 2021 02:40:42 +0300 Subject: [PATCH] polling: compatibility changes --- .../jaicf/channel/jaicp/JaicpConnector.kt | 11 ++++++++++- .../channel/jaicp/JaicpPollingConnector.kt | 19 +++++++++++++++++++ .../channel/jaicp/JaicpWebhookConnector.kt | 19 +++++++++++++++++++ .../channel/jaicp/polling/RequestPoller.kt | 9 +++++---- 4 files changed, 53 insertions(+), 5 deletions(-) diff --git a/channels/jaicp/src/main/kotlin/com/justai/jaicf/channel/jaicp/JaicpConnector.kt b/channels/jaicp/src/main/kotlin/com/justai/jaicf/channel/jaicp/JaicpConnector.kt index c88becec..6929ee66 100644 --- a/channels/jaicp/src/main/kotlin/com/justai/jaicf/channel/jaicp/JaicpConnector.kt +++ b/channels/jaicp/src/main/kotlin/com/justai/jaicf/channel/jaicp/JaicpConnector.kt @@ -33,9 +33,18 @@ abstract class JaicpConnector( val accessToken: String, val url: String, httpClient: HttpClient, - executor: Executor + executor: Executor, ) : WithLogger { + constructor( + botApi: BotApi, + channels: List, + accessToken: String, + url: String, + httpClient: HttpClient, + executorThreadPoolSize: Int, + ) : this(botApi, channels, accessToken, url, httpClient, Executors.newFixedThreadPool(executorThreadPoolSize)) + val jaicpExecutor = JaicpRequestExecutor(executor) private val chatAdapterConnector = ChatAdapterConnector(accessToken, url, httpClient) private var registeredChannels = fetchChannels() diff --git a/channels/jaicp/src/main/kotlin/com/justai/jaicf/channel/jaicp/JaicpPollingConnector.kt b/channels/jaicp/src/main/kotlin/com/justai/jaicf/channel/jaicp/JaicpPollingConnector.kt index 6977cec7..c44a9db1 100644 --- a/channels/jaicp/src/main/kotlin/com/justai/jaicf/channel/jaicp/JaicpPollingConnector.kt +++ b/channels/jaicp/src/main/kotlin/com/justai/jaicf/channel/jaicp/JaicpPollingConnector.kt @@ -9,6 +9,7 @@ import com.justai.jaicf.helpers.logging.WithLogger import io.ktor.client.* import io.ktor.client.features.logging.* import java.util.concurrent.Executor +import java.util.concurrent.Executors /** * This class is used to create polling coroutines for each channel, polls requests and sends responses. @@ -38,6 +39,24 @@ open class JaicpPollingConnector( ) : JaicpConnector(botApi, channels, accessToken, url, httpClient, executor), WithLogger { + constructor( + botApi: BotApi, + accessToken: String, + url: String = DEFAULT_PROXY_URL, + channels: List, + logLevel: LogLevel = LogLevel.INFO, + httpClient: HttpClient = HttpClientFactory.create(logLevel), + executorThreadPoolSize: Int + ) : this( + botApi, + accessToken, + url, + channels, + logLevel, + httpClient, + Executors.newFixedThreadPool(executorThreadPoolSize) + ) + private val dispatcher = Dispatcher(httpClient, jaicpExecutor) protected val channelMap = mutableMapOf() diff --git a/channels/jaicp/src/main/kotlin/com/justai/jaicf/channel/jaicp/JaicpWebhookConnector.kt b/channels/jaicp/src/main/kotlin/com/justai/jaicf/channel/jaicp/JaicpWebhookConnector.kt index 23761b83..d7fbeb96 100644 --- a/channels/jaicp/src/main/kotlin/com/justai/jaicf/channel/jaicp/JaicpWebhookConnector.kt +++ b/channels/jaicp/src/main/kotlin/com/justai/jaicf/channel/jaicp/JaicpWebhookConnector.kt @@ -19,6 +19,7 @@ import com.justai.jaicf.helpers.logging.WithLogger import io.ktor.client.* import io.ktor.client.features.logging.* import java.util.concurrent.Executor +import java.util.concurrent.Executors /** @@ -65,6 +66,24 @@ open class JaicpWebhookConnector( HttpBotChannel, JaicpConnector(botApi, channels, accessToken, url, httpClient, executor) { + constructor( + botApi: BotApi, + accessToken: String, + url: String = DEFAULT_PROXY_URL, + channels: List, + logLevel: LogLevel = LogLevel.INFO, + httpClient: HttpClient = HttpClientFactory.create(logLevel), + executorThreadPoolSize: Int + ) : this( + botApi, + accessToken, + url, + channels, + logLevel, + httpClient, + Executors.newFixedThreadPool(executorThreadPoolSize) + ) + protected val channelMap = mutableMapOf() init { diff --git a/channels/jaicp/src/main/kotlin/com/justai/jaicf/channel/jaicp/polling/RequestPoller.kt b/channels/jaicp/src/main/kotlin/com/justai/jaicf/channel/jaicp/polling/RequestPoller.kt index f6bb3b54..97dd44cc 100644 --- a/channels/jaicp/src/main/kotlin/com/justai/jaicf/channel/jaicp/polling/RequestPoller.kt +++ b/channels/jaicp/src/main/kotlin/com/justai/jaicf/channel/jaicp/polling/RequestPoller.kt @@ -13,6 +13,7 @@ import kotlinx.coroutines.runBlocking import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.jsonPrimitive import kotlinx.serialization.json.long +import java.util.concurrent.atomic.AtomicBoolean import kotlin.coroutines.CoroutineContext internal class RequestPoller( @@ -27,10 +28,10 @@ internal class RequestPoller( } private var unprocessed: Boolean = false - private var isActive: Boolean = false + private val isActive: AtomicBoolean = AtomicBoolean(false) suspend fun getUpdates(): Flow> = flow { - while (isActive) { + while (isActive.get()) { try { emit(doPoll()) } catch (ex: Exception) { @@ -48,11 +49,11 @@ internal class RequestPoller( } fun stopPolling() { - isActive = false + isActive.set(false) } fun startPolling() { - isActive = true + isActive.set(true) } private fun updateSince(requests: List) {