Skip to content

Commit

Permalink
syncFeeds mutation. (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
ychescale9 authored Aug 24, 2024
1 parent 6a12e04 commit 78662da
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.reactivecircus.kstreamlined.backend.datafetcher

import com.netflix.graphql.dgs.DgsComponent
import com.netflix.graphql.dgs.DgsMutation
import com.netflix.graphql.dgs.DgsQuery
import com.netflix.graphql.dgs.DgsTypeResolver
import com.netflix.graphql.dgs.InputArgument
Expand Down Expand Up @@ -60,6 +61,32 @@ class FeedEntryDataFetcher(
}
}

@DgsMutation(field = DgsConstants.MUTATION.SyncFeeds)
suspend fun syncFeeds(): Boolean = coroutineScope {
FeedSourceKey.entries.map { source ->
async(coroutineDispatcher) {
when (source) {
FeedSourceKey.KOTLIN_BLOG -> {
dataSource.loadKotlinBlogFeed(skipCache = true)
}

FeedSourceKey.KOTLIN_YOUTUBE_CHANNEL -> {
dataSource.loadKotlinYouTubeFeed(skipCache = true)
}

FeedSourceKey.TALKING_KOTLIN_PODCAST -> {
dataSource.loadTalkingKotlinFeed(skipCache = true)
}

FeedSourceKey.KOTLIN_WEEKLY -> {
dataSource.loadKotlinWeeklyFeed(skipCache = true)
}
}
}
}.awaitAll()
true
}

@DgsTypeResolver(name = DgsConstants.FEEDENTRY.TYPE_NAME)
internal fun resolveFeedEntry(feedEntry: FeedEntry): String {
return when (feedEntry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ class DataLoader<T : Any> private constructor(
@Suppress("ReturnCount")
suspend fun load(
key: String,
sotOnly: Boolean = false,
sot: suspend () -> List<T>
): List<T> {
if (sotOnly) return loadFromSot(key, sot)

// L1 cache - local
val l1Value = localCache.getIfPresent(key)
if (l1Value != null) {
Expand All @@ -40,6 +43,10 @@ class DataLoader<T : Any> private constructor(
}

// Source of truth
return loadFromSot(key, sot)
}

private suspend fun loadFromSot(key: String, sot: suspend () -> List<T>): List<T> {
val sotValue = sot()
localCache.put(key, sotValue)
redisClient.set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import nl.adaptivity.xmlutil.serialization.XmlConfig
import org.apache.commons.text.StringEscapeUtils

interface FeedDataSource {
suspend fun loadKotlinBlogFeed(): List<KotlinBlogItem>
suspend fun loadKotlinYouTubeFeed(): List<KotlinYouTubeItem>
suspend fun loadTalkingKotlinFeed(): List<TalkingKotlinItem>
suspend fun loadKotlinWeeklyFeed(): List<KotlinWeeklyItem>
suspend fun loadKotlinBlogFeed(skipCache: Boolean = false): List<KotlinBlogItem>
suspend fun loadKotlinYouTubeFeed(skipCache: Boolean = false): List<KotlinYouTubeItem>
suspend fun loadTalkingKotlinFeed(skipCache: Boolean = false): List<TalkingKotlinItem>
suspend fun loadKotlinWeeklyFeed(skipCache: Boolean = false): List<KotlinWeeklyItem>
}

class FeedDataSourceConfig(
Expand Down Expand Up @@ -73,8 +73,8 @@ class RealFeedDataSource(
}
}

override suspend fun loadKotlinBlogFeed(): List<KotlinBlogItem> {
return kotlinBlogFeedDataLoader.load("kotlin-blog") {
override suspend fun loadKotlinBlogFeed(skipCache: Boolean): List<KotlinBlogItem> {
return kotlinBlogFeedDataLoader.load("kotlin-blog", sotOnly = skipCache) {
httpClient.get(dataSourceConfig.kotlinBlogFeedUrl).body<KotlinBlogRss>().channel.items.map {
it.copy(
description = StringEscapeUtils.unescapeXml(it.description).trim()
Expand All @@ -83,16 +83,16 @@ class RealFeedDataSource(
}
}

override suspend fun loadKotlinYouTubeFeed(): List<KotlinYouTubeItem> {
return kotlinYouTubeFeedDataLoader.load("kotlin-youtube") {
override suspend fun loadKotlinYouTubeFeed(skipCache: Boolean): List<KotlinYouTubeItem> {
return kotlinYouTubeFeedDataLoader.load("kotlin-youtube", sotOnly = skipCache) {
httpClient.get(dataSourceConfig.kotlinYouTubeFeedUrl).bodyAsText().let {
DefaultXml.decodeFromString<KotlinYouTubeRss>(it.replace("&(?!.{2,4};)".toRegex(), "&amp;")).entries
}
}
}

override suspend fun loadTalkingKotlinFeed(): List<TalkingKotlinItem> {
return talkingKotlinFeedDataLoader.load("talking-kotlin") {
override suspend fun loadTalkingKotlinFeed(skipCache: Boolean): List<TalkingKotlinItem> {
return talkingKotlinFeedDataLoader.load("talking-kotlin", sotOnly = skipCache) {
httpClient.get(dataSourceConfig.talkingKotlinFeedUrl).body<TalkingKotlinRss>().channel.items
.take(TalkingKotlinFeedSize)
.map {
Expand All @@ -103,8 +103,8 @@ class RealFeedDataSource(
}
}

override suspend fun loadKotlinWeeklyFeed(): List<KotlinWeeklyItem> {
return kotlinWeeklyFeedDataLoader.load("kotlin-weekly") {
override suspend fun loadKotlinWeeklyFeed(skipCache: Boolean): List<KotlinWeeklyItem> {
return kotlinWeeklyFeedDataLoader.load("kotlin-weekly", sotOnly = skipCache) {
httpClient.get(dataSourceConfig.kotlinWeeklyFeedUrl).body<KotlinWeeklyRss>().channel.items
}
}
Expand Down
46 changes: 29 additions & 17 deletions src/main/resources/META-INF/native-image/reflect-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@
{
"name":"com.netflix.graphql.dgs.DgsFederationResolver"
},
{
"name":"com.netflix.graphql.dgs.DgsMutation",
"queryAllDeclaredMethods":true
},
{
"name":"com.netflix.graphql.dgs.DgsQuery",
"queryAllDeclaredMethods":true
Expand Down Expand Up @@ -850,7 +854,7 @@
"allDeclaredFields":true,
"queryAllDeclaredMethods":true,
"queryAllDeclaredConstructors":true,
"methods":[{"name":"<init>","parameterTypes":["io.github.reactivecircus.kstreamlined.backend.datasource.FeedDataSource"] }, {"name":"feedEntries","parameterTypes":["java.util.List","kotlin.coroutines.Continuation"] }, {"name":"resolveFeedEntry$kstreamlined_backend","parameterTypes":["io.github.reactivecircus.kstreamlined.backend.schema.generated.types.FeedEntry"] }]
"methods":[{"name":"<init>","parameterTypes":["io.github.reactivecircus.kstreamlined.backend.datasource.FeedDataSource"] }, {"name":"feedEntries","parameterTypes":["java.util.List","kotlin.coroutines.Continuation"] }, {"name":"resolveFeedEntry$kstreamlined_backend","parameterTypes":["io.github.reactivecircus.kstreamlined.backend.schema.generated.types.FeedEntry"] }, {"name":"syncFeeds","parameterTypes":["kotlin.coroutines.Continuation"] }]
},
{
"name":"io.github.reactivecircus.kstreamlined.backend.datafetcher.FeedSourceDataFetcher",
Expand Down Expand Up @@ -893,7 +897,7 @@
"name":"io.github.reactivecircus.kstreamlined.backend.datasource.RealFeedDataSource",
"allDeclaredFields":true,
"queryAllDeclaredMethods":true,
"methods":[{"name":"close","parameterTypes":[] }, {"name":"loadKotlinBlogFeed","parameterTypes":["kotlin.coroutines.Continuation"] }, {"name":"loadKotlinWeeklyFeed","parameterTypes":["kotlin.coroutines.Continuation"] }, {"name":"loadKotlinYouTubeFeed","parameterTypes":["kotlin.coroutines.Continuation"] }, {"name":"loadTalkingKotlinFeed","parameterTypes":["kotlin.coroutines.Continuation"] }, {"name":"shutdown","parameterTypes":[] }]
"methods":[{"name":"close","parameterTypes":[] }, {"name":"loadKotlinBlogFeed","parameterTypes":["kotlin.coroutines.Continuation"] }, {"name":"loadKotlinBlogFeed","parameterTypes":["boolean","kotlin.coroutines.Continuation"] }, {"name":"loadKotlinWeeklyFeed","parameterTypes":["kotlin.coroutines.Continuation"] }, {"name":"loadKotlinWeeklyFeed","parameterTypes":["boolean","kotlin.coroutines.Continuation"] }, {"name":"loadKotlinYouTubeFeed","parameterTypes":["kotlin.coroutines.Continuation"] }, {"name":"loadKotlinYouTubeFeed","parameterTypes":["boolean","kotlin.coroutines.Continuation"] }, {"name":"loadTalkingKotlinFeed","parameterTypes":["kotlin.coroutines.Continuation"] }, {"name":"loadTalkingKotlinFeed","parameterTypes":["boolean","kotlin.coroutines.Continuation"] }, {"name":"shutdown","parameterTypes":[] }]
},
{
"name":"io.github.reactivecircus.kstreamlined.backend.datasource.RealKotlinWeeklyIssueDataSource",
Expand Down Expand Up @@ -936,36 +940,36 @@
},
{
"name":"io.github.reactivecircus.kstreamlined.backend.schema.generated.types.FeedSource",
"allDeclaredFields":true,
"allDeclaredMethods":true
"allDeclaredFields":true,
"allDeclaredMethods":true
},
{
"name":"io.github.reactivecircus.kstreamlined.backend.schema.generated.types.FeedSourceKey"
},
{
"name":"io.github.reactivecircus.kstreamlined.backend.schema.generated.types.KotlinBlog",
"allDeclaredFields":true,
"allDeclaredMethods":true
"allDeclaredFields":true,
"allDeclaredMethods":true
},
{
"name":"io.github.reactivecircus.kstreamlined.backend.schema.generated.types.KotlinWeekly",
"allDeclaredFields":true,
"allDeclaredMethods":true
"allDeclaredFields":true,
"allDeclaredMethods":true
},
{
"name":"io.github.reactivecircus.kstreamlined.backend.schema.generated.types.KotlinWeeklyIssueEntry",
"allDeclaredFields":true,
"allDeclaredMethods":true
"allDeclaredFields":true,
"allDeclaredMethods":true
},
{
"name":"io.github.reactivecircus.kstreamlined.backend.schema.generated.types.KotlinYouTube",
"allDeclaredFields":true,
"allDeclaredMethods":true
"allDeclaredFields":true,
"allDeclaredMethods":true
},
{
"name":"io.github.reactivecircus.kstreamlined.backend.schema.generated.types.TalkingKotlin",
"allDeclaredFields":true,
"allDeclaredMethods":true
"allDeclaredFields":true,
"allDeclaredMethods":true
},
{
"name":"io.ktor.client.HttpClient",
Expand Down Expand Up @@ -1298,7 +1302,7 @@
},
{
"name":"java.io.Console",
"methods":[{"name":"isTerminal","parameterTypes":[] }]
"methods":[{"name":"charset","parameterTypes":[] }, {"name":"isTerminal","parameterTypes":[] }]
},
{
"name":"java.io.FileDescriptor"
Expand All @@ -1323,7 +1327,7 @@
{
"name":"java.lang.Class",
"queryAllDeclaredMethods":true,
"methods":[{"name":"getPermittedSubclasses","parameterTypes":[] }, {"name":"getRecordComponents","parameterTypes":[] }, {"name":"isRecord","parameterTypes":[] }, {"name":"isSealed","parameterTypes":[] }]
"methods":[{"name":"accessFlags","parameterTypes":[] }, {"name":"getPermittedSubclasses","parameterTypes":[] }, {"name":"getRecordComponents","parameterTypes":[] }, {"name":"isRecord","parameterTypes":[] }, {"name":"isSealed","parameterTypes":[] }]
},
{
"name":"java.lang.ClassLoader",
Expand Down Expand Up @@ -1472,7 +1476,12 @@
"name":"java.security.interfaces.RSAPublicKey"
},
{
"name":"java.time.Duration"
"name":"java.text.NumberFormat",
"methods":[{"name":"isStrict","parameterTypes":[] }]
},
{
"name":"java.time.Duration",
"methods":[{"name":"isPositive","parameterTypes":[] }]
},
{
"name":"java.time.DurationEditor"
Expand Down Expand Up @@ -1563,6 +1572,9 @@
{
"name":"javax.inject.Named"
},
{
"name":"javax.inject.Qualifier"
},
{
"name":"javax.money.MonetaryAmount"
},
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/META-INF/native-image/resource-config.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
{
"resources":{
"includes":[{
"pattern": "schema/.*\\.graphqls$"}
,{
"pattern":"\\QMETA-INF/build-info.properties\\E"
}, {
"pattern":"\\QMETA-INF/resources/index.html\\E"
Expand Down Expand Up @@ -474,6 +472,8 @@
"pattern":"jdk.internal.le:\\Qjdk/internal/org/jline/utils/capabilities.txt\\E"
}, {
"pattern":"jdk.internal.le:\\Qjdk/internal/org/jline/utils/xterm-256color.caps\\E"
}, {
"pattern":"schema/.*\\.graphqls$"
}]},
"bundles":[{
"name":"i18n.Parsing",
Expand Down
5 changes: 5 additions & 0 deletions src/main/resources/schema/kstreamlined.graphqls
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ type Query {
kotlinWeeklyIssue(url: String!): [KotlinWeeklyIssueEntry!]!
}

type Mutation {
"Syncs feeds from all sources."
syncFeeds: Boolean!
}

type FeedSource {
"Unique identifier of the feed source."
key: FeedSourceKey!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ class FeedEntryDataFetcherTest {
}
""".trimIndent()

private val syncFeedsMutation = """
mutation SyncFeeds {
syncFeeds
}
""".trimIndent()

@Test
fun `feedEntries() query returns expected feed entries ordered by publish time when operation succeeds`() {
(feedDataSource as FakeFeedDataSource).nextKotlinBlogFeedResponse = {
Expand Down Expand Up @@ -173,5 +179,45 @@ class FeedEntryDataFetcherTest {
assert(context.read<String>("data.feedEntries[1].description") == dummyKotlinYouTubeEntry.description)
}

@Test
fun `syncFeeds mutation returns true when operation succeeds`() {
(feedDataSource as FakeFeedDataSource).nextKotlinBlogFeedResponse = {
listOf(DummyKotlinBlogItem)
}
(feedDataSource as FakeFeedDataSource).nextKotlinYouTubeFeedResponse = {
listOf(DummyKotlinYouTubeItem)
}
(feedDataSource as FakeFeedDataSource).nextTalkingKotlinFeedResponse = {
listOf(DummyTalkingKotlinItem)
}
(feedDataSource as FakeFeedDataSource).nextKotlinWeeklyFeedResponse = {
listOf(DummyKotlinWeeklyItem)
}

val context = dgsQueryExecutor.executeAndGetDocumentContext(syncFeedsMutation)

assert(context.read<Boolean>("data.syncFeeds") == true)
}

@Test
fun `syncFeeds mutation returns error response when failed to load data from any feed sources`() {
(feedDataSource as FakeFeedDataSource).nextKotlinBlogFeedResponse = {
throw GraphqlErrorException.newErrorException().build()
}
(feedDataSource as FakeFeedDataSource).nextKotlinYouTubeFeedResponse = {
listOf(DummyKotlinYouTubeItem)
}
(feedDataSource as FakeFeedDataSource).nextTalkingKotlinFeedResponse = {
listOf(DummyTalkingKotlinItem)
}
(feedDataSource as FakeFeedDataSource).nextKotlinWeeklyFeedResponse = {
listOf(DummyKotlinWeeklyItem)
}

val result = dgsQueryExecutor.execute(syncFeedsMutation)

assert(result.errors[0].extensions["errorType"] == "INTERNAL")
}

private fun String.toInstant(): Instant = Instant.parse(this)
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,34 @@ class DataLoaderTest {
assert(redisMockEngine.responseHistory.last().statusCode == HttpStatusCode.OK)
}

@Test
fun `skips both local and remote caches when loading with sotOnly = true`() = runBlocking {
localCache.put("key", listOf(1, 2))

val redisMockEngine = MockEngine {
if (it.url.encodedPath.contains("get")) {
respond(content = "{ \"result\": \"[1, 2, 3]\" }")
} else {
respond(content = "{ \"result\": \"OK\" }")
}
}
val dataLoader = createDataLoader(
localCache = localCache,
remoteCacheExpiry = 1.hours,
redisMockEngine = redisMockEngine,
)

dataLoader.load("key", sotOnly = true) {
listOf(1, 2, 3, 4)
}

assert(localCache.getIfPresent("key") == listOf(1, 2, 3, 4))
assert(redisMockEngine.requestHistory.last().url.pathSegments.last() == "key")
assert(redisMockEngine.requestHistory.last().url.encodedQuery == "EX=${1.hours.inWholeSeconds}")
assert(redisMockEngine.requestHistory.last().body.toString() == "TextContent[application/json] \"[1,2,3,4]\"")
assert(redisMockEngine.responseHistory.last().statusCode == HttpStatusCode.OK)
}

private fun createDataLoader(
localCache: Cache<String, List<Int>>,
remoteCacheExpiry: Duration = 1.days,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@ object FakeFeedDataSource : FeedDataSource {
listOf(DummyKotlinWeeklyItem)
}

override suspend fun loadKotlinBlogFeed(): List<KotlinBlogItem> {
override suspend fun loadKotlinBlogFeed(skipCache: Boolean): List<KotlinBlogItem> {
return nextKotlinBlogFeedResponse()
}

override suspend fun loadKotlinYouTubeFeed(): List<KotlinYouTubeItem> {
override suspend fun loadKotlinYouTubeFeed(skipCache: Boolean): List<KotlinYouTubeItem> {
return nextKotlinYouTubeFeedResponse()
}

override suspend fun loadTalkingKotlinFeed(): List<TalkingKotlinItem> {
override suspend fun loadTalkingKotlinFeed(skipCache: Boolean): List<TalkingKotlinItem> {
return nextTalkingKotlinFeedResponse()
}

override suspend fun loadKotlinWeeklyFeed(): List<KotlinWeeklyItem> {
override suspend fun loadKotlinWeeklyFeed(skipCache: Boolean): List<KotlinWeeklyItem> {
return nextKotlinWeeklyFeedResponse()
}
}
Expand Down

0 comments on commit 78662da

Please sign in to comment.