From ba8445015601b0968f05ecb1305f561456917905 Mon Sep 17 00:00:00 2001 From: Sergey Chelombitko Date: Tue, 17 Dec 2024 17:04:41 +0000 Subject: [PATCH] Consume cache results as a flow rather than channel --- .../marathon/cache/test/CacheResult.kt | 9 +- .../marathon/cache/test/TestCacheLoader.kt | 93 +++++++++---------- .../malinskiy/marathon/execution/Scheduler.kt | 32 ++++--- 3 files changed, 63 insertions(+), 71 deletions(-) diff --git a/core/src/main/kotlin/com/malinskiy/marathon/cache/test/CacheResult.kt b/core/src/main/kotlin/com/malinskiy/marathon/cache/test/CacheResult.kt index b5baae0b6..d8b7e5844 100644 --- a/core/src/main/kotlin/com/malinskiy/marathon/cache/test/CacheResult.kt +++ b/core/src/main/kotlin/com/malinskiy/marathon/cache/test/CacheResult.kt @@ -2,17 +2,16 @@ package com.malinskiy.marathon.cache.test import com.malinskiy.marathon.device.DevicePoolId import com.malinskiy.marathon.execution.TestResult -import com.malinskiy.marathon.execution.TestShard +import com.malinskiy.marathon.test.Test sealed class CacheResult { - - class Hit( + data class Hit( val pool: DevicePoolId, val testResult: TestResult ) : CacheResult() - class Miss( + data class Miss( val pool: DevicePoolId, - val testShard: TestShard + val test: Test ) : CacheResult() } diff --git a/core/src/main/kotlin/com/malinskiy/marathon/cache/test/TestCacheLoader.kt b/core/src/main/kotlin/com/malinskiy/marathon/cache/test/TestCacheLoader.kt index 9e277ccb5..a80520cc4 100644 --- a/core/src/main/kotlin/com/malinskiy/marathon/cache/test/TestCacheLoader.kt +++ b/core/src/main/kotlin/com/malinskiy/marathon/cache/test/TestCacheLoader.kt @@ -12,78 +12,69 @@ import com.malinskiy.marathon.log.MarathonLogging import com.malinskiy.marathon.test.Test import com.malinskiy.marathon.test.toSimpleSafeTestName import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Deferred -import kotlinx.coroutines.async +import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.ReceiveChannel -import kotlin.system.measureTimeMillis +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.launch +import kotlin.time.measureTimedValue class TestCacheLoader( private val configuration: Configuration, private val cache: TestResultsCache, private val cacheKeyFactory: TestCacheKeyFactory -) { +) : AutoCloseable { private val logger = MarathonLogging.getLogger(TestCacheLoader::class.java) - - private val _results: Channel = unboundedChannel() - val results: ReceiveChannel - get() = _results - private val testsToCheck: Channel = unboundedChannel() + private var job: Job? = null - private lateinit var cacheCheckCompleted: Deferred - - fun initialize(scope: CoroutineScope) { - cacheCheckCompleted = scope.async { - // TODO: check concurrently - for (test in testsToCheck) { - var result: CacheResult? = null - val timeMillis = measureTimeMillis { - val cacheKey = cacheKeyFactory.getCacheKey(test.poolId, test.test) - - result = cache.load(cacheKey, test.test)?.let { - Hit(test.poolId, it) - } ?: Miss(test.poolId, TestShard(listOf(test.test))) - - _results.send(result!!) - } - - val hitOrMiss = when (result!!) { - is Hit -> "hit" - is Miss -> "miss" + fun start(scope: CoroutineScope, consumer: suspend (CacheResult) -> Unit) { + job = scope.launch { + testsToCheck.consumeAsFlow() + .map { test -> + val (result, duration) = measureTimedValue { + check(test) + } + val hitOrMiss = when (result) { + is Hit -> "hit" + is Miss -> "miss" + } + logger.debug("Cache {} for {} took {}ms", hitOrMiss, test.test.toSimpleSafeTestName(), duration.inWholeMilliseconds) + result } - logger.debug("Cache {} for {} took {}ms", hitOrMiss, test.test.toSimpleSafeTestName(), timeMillis) - } + .collect(consumer) } } - suspend fun addTests(poolId: DevicePoolId, tests: TestShard) { - if (configuration.cache.isEnabled) { - val testCacheBlackList: MutableList = arrayListOf() - tests.tests.forEach { test -> - if (configuration.strictRunConfiguration.filter.matches(test)) { - testCacheBlackList.add(test) - } else { - testsToCheck.send(TestToCheck(poolId, test)) - } - } + private suspend fun check(test: TestToCheck): CacheResult { + val strictRun = configuration.strictRunConfiguration.filter.matches(test.test) + if (strictRun) { + logger.debug("Cache miss for test in blacklist: {}", test.test.toSimpleSafeTestName()) + return Miss(test.poolId, test.test) + } + + val cacheKey = cacheKeyFactory.getCacheKey(test.poolId, test.test) + return cache.load(cacheKey, test.test) + ?.let { Hit(test.poolId, it) } + ?: Miss(test.poolId, test.test) + } - if (testCacheBlackList.isNotEmpty()) { - logger.debug("Cache miss for tests in blacklist: {}", testCacheBlackList.map { it.toSimpleSafeTestName() }) - _results.send(Miss(poolId, TestShard(testCacheBlackList))) - } - } else { - _results.send(Miss(poolId, tests)) + suspend fun addTests(poolId: DevicePoolId, testShard: TestShard) { + testShard.tests.forEach { test -> + testsToCheck.send(TestToCheck(poolId, test)) } } suspend fun stop() { testsToCheck.close() - cacheCheckCompleted.await() - _results.close() + job?.join() logger.debug("Cache loader is terminated") } - private class TestToCheck(val poolId: DevicePoolId, val test: Test) + override fun close() { + testsToCheck.close() + } + + private data class TestToCheck(val poolId: DevicePoolId, val test: Test) } diff --git a/core/src/main/kotlin/com/malinskiy/marathon/execution/Scheduler.kt b/core/src/main/kotlin/com/malinskiy/marathon/execution/Scheduler.kt index 9d88a3f29..e10122d1b 100644 --- a/core/src/main/kotlin/com/malinskiy/marathon/execution/Scheduler.kt +++ b/core/src/main/kotlin/com/malinskiy/marathon/execution/Scheduler.kt @@ -118,6 +118,7 @@ class Scheduler( override fun close() { deviceProvider.close() + cacheLoader.close() cacheService.close() } @@ -125,15 +126,7 @@ class Scheduler( logger.debug("Test cache is ${if (configuration.cache.isEnabled) "enabled" else "disabled"}") if (configuration.cache.isEnabled) { - cacheLoader.initialize(scope) - scope.launch { - for (cacheResult in cacheLoader.results) { - when (cacheResult) { - is CacheResult.Miss -> pools.getValue(cacheResult.pool).send(FromScheduler.AddTests(cacheResult.testShard)) - is CacheResult.Hit -> cachedTestsReporter.onCachedTest(cacheResult.pool, cacheResult.testResult) - } - } - } + cacheLoader.start(scope, ::onCacheResult) } if (configuration.cache.isPushEnabled) { cacheSaver.initialize(scope) @@ -147,12 +140,21 @@ class Scheduler( scope.launch { deviceProvider.deviceEvents .filter { isAllowedByConfiguration(it.device) } - .collect { event -> - when (event) { - is DeviceEvent.DeviceConnected -> onDeviceConnected(event.device, coroutineContext) - is DeviceEvent.DeviceDisconnected -> onDeviceDisconnected(event.device) - } - } + .collect(::onDeviceEvent) + } + } + + private suspend fun onCacheResult(cacheResult: CacheResult) { + when (cacheResult) { + is CacheResult.Miss -> pools.getValue(cacheResult.pool).send(FromScheduler.AddTests(TestShard(listOf(cacheResult.test)))) + is CacheResult.Hit -> cachedTestsReporter.onCachedTest(cacheResult.pool, cacheResult.testResult) + } + } + + private suspend fun onDeviceEvent(event: DeviceEvent) { + when (event) { + is DeviceEvent.DeviceConnected -> onDeviceConnected(event.device, coroutineContext) + is DeviceEvent.DeviceDisconnected -> onDeviceDisconnected(event.device) } }