Skip to content

Commit

Permalink
KTOR-8190 Fix OOM issue in ByteChannel.copyTo (#4678)
Browse files Browse the repository at this point in the history
* KTOR-8190 Fix OOM issue in ByteChannel.copyTo

* KTOR-8190 Update SourceByteReadChannel to use source's buffer as readBuffer
  • Loading branch information
bjhham authored Feb 19, 2025
1 parent e63cf88 commit 8b73519
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 47 deletions.
12 changes: 7 additions & 5 deletions ktor-io/common/src/io/ktor/utils/io/SourceByteReadChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

package io.ktor.utils.io

import io.ktor.utils.io.core.*
import kotlinx.io.*
import kotlin.concurrent.*
import kotlinx.io.IOException
import kotlinx.io.InternalIoApi
import kotlinx.io.Source
import kotlin.concurrent.Volatile

internal class SourceByteReadChannel(private val source: Source) : ByteReadChannel {
@Volatile
Expand All @@ -18,16 +19,17 @@ internal class SourceByteReadChannel(private val source: Source) : ByteReadChann
override val isClosedForRead: Boolean
get() = source.exhausted()

@OptIn(InternalIoApi::class)
@InternalAPI
override val readBuffer: Source
get() {
closedCause?.let { throw it }
return source
return source.buffer
}

override suspend fun awaitContent(min: Int): Boolean {
closedCause?.let { throw it }
return source.remaining >= min
return source.request(min.toLong())
}

override fun cancel(cause: Throwable?) {
Expand Down
42 changes: 0 additions & 42 deletions ktor-io/jvm/test/ByteWriteChannelOperationsTest.kt

This file was deleted.

76 changes: 76 additions & 0 deletions ktor-io/jvmAndPosix/test/ByteWriteLargeTransfersTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.ReaderJob
import io.ktor.utils.io.copyAndClose
import io.ktor.utils.io.discard
import io.ktor.utils.io.reader
import io.ktor.utils.io.writeBuffer
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import kotlinx.io.RawSource
import kotlinx.io.buffered
import kotlinx.io.files.Path
import kotlin.getValue
import kotlin.test.Test

private const val KB = 1024L
private const val GB = KB * KB * KB

class ByteWriteLargeTransfersTest {

// does not exist on windows systems
private val randomBytesFile by lazy {
try {
Path("/dev/random").takeIf { kotlinx.io.files.SystemFileSystem.exists(it) }
} catch (_: Throwable) {
null
}
}

private fun CoroutineScope.oneBillionBytes(): ReaderJob =
reader {
var count = 0L
while (!channel.isClosedForRead && count < GB) {
channel.discard(KB)
count += KB
}
}

private fun readFromRandomBytesFile(): RawSource =
kotlinx.io.files.SystemFileSystem.source(randomBytesFile!!)

@Test
fun writeBuffer() = runTest {
if (randomBytesFile == null) return@runTest

val reader = oneBillionBytes()
val writeJob = launch {
readFromRandomBytesFile().use { source ->
reader.channel.writeBuffer(source)
}
}

reader.job.join()
writeJob.cancel()
}

@Test
fun copyTo() = runTest {
if (randomBytesFile == null) return@runTest

val reader = oneBillionBytes()
val writeJob = launch {
readFromRandomBytesFile().use { source ->
ByteReadChannel(source.buffered())
.copyAndClose(reader.channel)
}
}

reader.job.join()
writeJob.cancel()
}
}

0 comments on commit 8b73519

Please sign in to comment.