Skip to content

Commit

Permalink
Hopefully fix persistent message storage bug
Browse files Browse the repository at this point in the history
  • Loading branch information
randomnetcat committed Oct 31, 2023
1 parent 6495fae commit 0f148ce
Showing 1 changed file with 59 additions and 33 deletions.
92 changes: 59 additions & 33 deletions src/main/kotlin/org/randomcat/agorabot/features/PeriodicMessage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,33 @@ package org.randomcat.agorabot.features
import kotlinx.collections.immutable.PersistentMap
import kotlinx.collections.immutable.persistentMapOf
import kotlinx.collections.immutable.toPersistentMap
import kotlinx.coroutines.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.launch
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.randomcat.agorabot.*
import org.randomcat.agorabot.config.persist.feature.ConfigPersistServiceTag
import org.randomcat.agorabot.setup.features.featureConfigDir
import org.randomcat.agorabot.util.await
import org.randomcat.agorabot.util.insecureRandom
import org.randomcat.agorabot.util.userFacingRandom
import org.randomcat.agorabot.util.withTempFile
import org.slf4j.LoggerFactory
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.time.Instant
import java.time.OffsetDateTime
import java.time.ZoneOffset
import java.time.temporal.ChronoField
import java.util.concurrent.atomic.AtomicReference
import kotlin.io.path.readText
import kotlin.io.path.writeText
import kotlin.time.Duration.Companion.seconds
import kotlin.time.ExperimentalTime
import java.nio.file.NoSuchFileException as NioNoSuchFileException

private enum class PeriodicMessageInterval(val configName: String) {
WEEKLY("weekly"),
Expand Down Expand Up @@ -147,6 +152,7 @@ private fun randomNextInterval(baseTime: Instant, interval: PeriodicMessageInter

private val coroutineScopeDep = FeatureDependency.Single(CoroutineScopeTag)
private val jdaDep = FeatureDependency.Single(JdaTag)
private val persistServiceDep = FeatureDependency.Single(ConfigPersistServiceTag)

@FeatureSourceFactory
fun periodicMessageSource(): FeatureSource<*> = object : FeatureSource<PeriodicMessageFeatureConfig> {
Expand All @@ -161,37 +167,55 @@ fun periodicMessageSource(): FeatureSource<*> = object : FeatureSource<PeriodicM
}

override val dependencies: List<FeatureDependency<*>>
get() = listOf(coroutineScopeDep, jdaDep)
get() = listOf(coroutineScopeDep, jdaDep, persistServiceDep)

override val provides: List<FeatureElementTag<*>>
get() = listOf(StartupBlockTag)

override fun createFeature(config: PeriodicMessageFeatureConfig, context: FeatureSourceContext): Feature {
val coroutineScope = context[coroutineScopeDep]
val jda = context[jdaDep]
val persistService = context[persistServiceDep]

return object : Feature {
override fun <T> query(tag: FeatureElementTag<T>): List<T> {
if (tag is StartupBlockTag) return tag.values({
coroutineScope.launch {
var currentState = try {
val storageText = config.storagePath.readText()
logger.info("Loaded periodic message storage: $storageText")

PeriodicMessageFeatureState.from(
Json.decodeFromString<PeriodicMessageFeatureStateDto>(storageText)
)
} catch (e: NioNoSuchFileException) {
PeriodicMessageFeatureState(messages = persistentMapOf())
}
val currentState = AtomicReference(
try {
val storageText = config.storagePath.readText()
logger.info("Loaded periodic message storage: $storageText")

PeriodicMessageFeatureState.from(
Json.decodeFromString<PeriodicMessageFeatureStateDto>(storageText)
)
} catch (e: Exception) {
logger.error("Error loading periodic message state, using empty state", e)
PeriodicMessageFeatureState(messages = persistentMapOf())
}
)

val persistHandle = persistService.schedulePersistence(
readState = { currentState.get() },
persist = { state ->
withTempFile { tempFile ->
Files.writeString(
tempFile,
Json.encodeToString<PeriodicMessageFeatureStateDto>(state.toDto()),
)

Files.move(tempFile, config.storagePath, StandardCopyOption.REPLACE_EXISTING)
}
}
)

while (true) {
ensureActive()
try {
while (true) {
ensureActive()

try {
for ((id, messageConfig) in config.list.messages) {
val checkTime = Instant.now()
val previousScheduled = currentState.messages[id]?.scheduledTime
val previousScheduled = currentState.get().messages[id]?.scheduledTime

if (previousScheduled == null || checkTime >= previousScheduled) {
try {
Expand All @@ -200,16 +224,20 @@ fun periodicMessageSource(): FeatureSource<*> = object : FeatureSource<PeriodicM
?.sendMessage(messageConfig.options.random(userFacingRandom()))
?.await()

currentState = currentState.copy(
messages = currentState.messages.put(
id,
PeriodicMessageState(
scheduledTime = randomNextInterval(
checkTime,
messageConfig.randomInterval,
// Don't need to use an update method because there is only one writer.

currentState.set(
currentState.get().copy(
messages = currentState.get().messages.put(
id,
PeriodicMessageState(
scheduledTime = randomNextInterval(
checkTime,
messageConfig.randomInterval,
),
),
),
),
)
)
} catch (e: Exception) {
logger.error("Error handling periodic message", e)
Expand All @@ -218,14 +246,12 @@ fun periodicMessageSource(): FeatureSource<*> = object : FeatureSource<PeriodicM

ensureActive()
}
} finally {
withContext(NonCancellable) {
config.storagePath.writeText(Json.encodeToString(currentState.toDto()))
}
}

@OptIn(ExperimentalTime::class)
delay(10.seconds)
@OptIn(ExperimentalTime::class)
delay(10.seconds)
}
} finally {
persistHandle.stopPersistence()
}
}
})
Expand Down

0 comments on commit 0f148ce

Please sign in to comment.