Skip to content

Commit

Permalink
Consume cache results as a flow rather than channel
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Chelombitko committed Dec 17, 2024
1 parent aea9fb2 commit ba84450
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ package com.malinskiy.marathon.cache.test

import com.malinskiy.marathon.device.DevicePoolId
import com.malinskiy.marathon.execution.TestResult
import com.malinskiy.marathon.execution.TestShard
import com.malinskiy.marathon.test.Test

sealed class CacheResult {

class Hit(
data class Hit(
val pool: DevicePoolId,
val testResult: TestResult
) : CacheResult()

class Miss(
data class Miss(
val pool: DevicePoolId,
val testShard: TestShard
val test: Test
) : CacheResult()
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,78 +12,69 @@ import com.malinskiy.marathon.log.MarathonLogging
import com.malinskiy.marathon.test.Test
import com.malinskiy.marathon.test.toSimpleSafeTestName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlin.system.measureTimeMillis
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlin.time.measureTimedValue

class TestCacheLoader(
private val configuration: Configuration,
private val cache: TestResultsCache,
private val cacheKeyFactory: TestCacheKeyFactory
) {
) : AutoCloseable {

private val logger = MarathonLogging.getLogger(TestCacheLoader::class.java)

private val _results: Channel<CacheResult> = unboundedChannel()
val results: ReceiveChannel<CacheResult>
get() = _results

private val testsToCheck: Channel<TestToCheck> = unboundedChannel()
private var job: Job? = null

private lateinit var cacheCheckCompleted: Deferred<Unit>

fun initialize(scope: CoroutineScope) {
cacheCheckCompleted = scope.async {
// TODO: check concurrently
for (test in testsToCheck) {
var result: CacheResult? = null
val timeMillis = measureTimeMillis {
val cacheKey = cacheKeyFactory.getCacheKey(test.poolId, test.test)

result = cache.load(cacheKey, test.test)?.let {
Hit(test.poolId, it)
} ?: Miss(test.poolId, TestShard(listOf(test.test)))

_results.send(result!!)
}

val hitOrMiss = when (result!!) {
is Hit -> "hit"
is Miss -> "miss"
fun start(scope: CoroutineScope, consumer: suspend (CacheResult) -> Unit) {
job = scope.launch {
testsToCheck.consumeAsFlow()
.map { test ->
val (result, duration) = measureTimedValue {
check(test)
}
val hitOrMiss = when (result) {
is Hit -> "hit"
is Miss -> "miss"
}
logger.debug("Cache {} for {} took {}ms", hitOrMiss, test.test.toSimpleSafeTestName(), duration.inWholeMilliseconds)
result
}
logger.debug("Cache {} for {} took {}ms", hitOrMiss, test.test.toSimpleSafeTestName(), timeMillis)
}
.collect(consumer)
}
}

suspend fun addTests(poolId: DevicePoolId, tests: TestShard) {
if (configuration.cache.isEnabled) {
val testCacheBlackList: MutableList<Test> = arrayListOf()
tests.tests.forEach { test ->
if (configuration.strictRunConfiguration.filter.matches(test)) {
testCacheBlackList.add(test)
} else {
testsToCheck.send(TestToCheck(poolId, test))
}
}
private suspend fun check(test: TestToCheck): CacheResult {
val strictRun = configuration.strictRunConfiguration.filter.matches(test.test)
if (strictRun) {
logger.debug("Cache miss for test in blacklist: {}", test.test.toSimpleSafeTestName())
return Miss(test.poolId, test.test)
}

val cacheKey = cacheKeyFactory.getCacheKey(test.poolId, test.test)
return cache.load(cacheKey, test.test)
?.let { Hit(test.poolId, it) }
?: Miss(test.poolId, test.test)
}

if (testCacheBlackList.isNotEmpty()) {
logger.debug("Cache miss for tests in blacklist: {}", testCacheBlackList.map { it.toSimpleSafeTestName() })
_results.send(Miss(poolId, TestShard(testCacheBlackList)))
}
} else {
_results.send(Miss(poolId, tests))
suspend fun addTests(poolId: DevicePoolId, testShard: TestShard) {
testShard.tests.forEach { test ->
testsToCheck.send(TestToCheck(poolId, test))
}
}

suspend fun stop() {
testsToCheck.close()
cacheCheckCompleted.await()
_results.close()
job?.join()
logger.debug("Cache loader is terminated")
}

private class TestToCheck(val poolId: DevicePoolId, val test: Test)
override fun close() {
testsToCheck.close()
}

private data class TestToCheck(val poolId: DevicePoolId, val test: Test)
}
32 changes: 17 additions & 15 deletions core/src/main/kotlin/com/malinskiy/marathon/execution/Scheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -118,22 +118,15 @@ class Scheduler(

override fun close() {
deviceProvider.close()
cacheLoader.close()
cacheService.close()
}

private fun initializeCache(scope: CoroutineScope) {
logger.debug("Test cache is ${if (configuration.cache.isEnabled) "enabled" else "disabled"}")

if (configuration.cache.isEnabled) {
cacheLoader.initialize(scope)
scope.launch {
for (cacheResult in cacheLoader.results) {
when (cacheResult) {
is CacheResult.Miss -> pools.getValue(cacheResult.pool).send(FromScheduler.AddTests(cacheResult.testShard))
is CacheResult.Hit -> cachedTestsReporter.onCachedTest(cacheResult.pool, cacheResult.testResult)
}
}
}
cacheLoader.start(scope, ::onCacheResult)
}
if (configuration.cache.isPushEnabled) {
cacheSaver.initialize(scope)
Expand All @@ -147,12 +140,21 @@ class Scheduler(
scope.launch {
deviceProvider.deviceEvents
.filter { isAllowedByConfiguration(it.device) }
.collect { event ->
when (event) {
is DeviceEvent.DeviceConnected -> onDeviceConnected(event.device, coroutineContext)
is DeviceEvent.DeviceDisconnected -> onDeviceDisconnected(event.device)
}
}
.collect(::onDeviceEvent)
}
}

private suspend fun onCacheResult(cacheResult: CacheResult) {
when (cacheResult) {
is CacheResult.Miss -> pools.getValue(cacheResult.pool).send(FromScheduler.AddTests(TestShard(listOf(cacheResult.test))))
is CacheResult.Hit -> cachedTestsReporter.onCachedTest(cacheResult.pool, cacheResult.testResult)
}
}

private suspend fun onDeviceEvent(event: DeviceEvent) {
when (event) {
is DeviceEvent.DeviceConnected -> onDeviceConnected(event.device, coroutineContext)
is DeviceEvent.DeviceDisconnected -> onDeviceDisconnected(event.device)
}
}

Expand Down

0 comments on commit ba84450

Please sign in to comment.