Skip to content

Commit

Permalink
Merge pull request #680 from splendo/develop
Browse files Browse the repository at this point in the history
Kaluga 1.1
  • Loading branch information
Daeda88 authored Mar 24, 2023
2 parents 2a764d5 + a5d37d0 commit 6842f1e
Show file tree
Hide file tree
Showing 78 changed files with 3,342 additions and 1,256 deletions.
5 changes: 5 additions & 0 deletions base/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ The initializer will be called if the number of flows observing changes from 0 t
The deinitializer will be called when the number of flows observing changes drops to 0.
Use `ColdStateRepo` for this behaviour.

### BufferedAsListChannel
Consuming a flow may often take longer than producing it. Kotlin Flows can handle this using `buffer` but this progressively increase the time between data being produced and data being consumed.
Kaluga offers a `BufferedAsListChannel` to buffer all data produced between consumption into a list. This allows the consumer to deal with groups of data and ideally prevent increasing delays.
The `BufferedAsListChannel` is a `Channel` that always buffers an unlimited amount of data points.

## Date
Kaluga includes a `Date` class to manage and compare time.
Dates can be created using either `Date.now()` or `Date.epoch()`.
Expand Down
16 changes: 16 additions & 0 deletions base/api/androidLib/base.api
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,22 @@ public abstract class com/splendo/kaluga/base/utils/BaseTimeZone {
public static synthetic fun usesDaylightSavingsTime$default (Lcom/splendo/kaluga/base/utils/BaseTimeZone;Lcom/splendo/kaluga/base/utils/KalugaDate;ILjava/lang/Object;)Z
}

public abstract interface class com/splendo/kaluga/base/utils/BufferedAsListChannel : kotlinx/coroutines/channels/ReceiveChannel, kotlinx/coroutines/channels/SendChannel {
}

public final class com/splendo/kaluga/base/utils/BufferedAsListChannel$DefaultImpls {
public static synthetic fun cancel (Lcom/splendo/kaluga/base/utils/BufferedAsListChannel;)V
public static fun getOnReceiveOrNull (Lcom/splendo/kaluga/base/utils/BufferedAsListChannel;)Lkotlinx/coroutines/selects/SelectClause1;
public static fun offer (Lcom/splendo/kaluga/base/utils/BufferedAsListChannel;Ljava/lang/Object;)Z
public static fun poll (Lcom/splendo/kaluga/base/utils/BufferedAsListChannel;)Ljava/util/List;
public static fun receiveOrNull (Lcom/splendo/kaluga/base/utils/BufferedAsListChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class com/splendo/kaluga/base/utils/BufferedAsListChannelKt {
public static final fun BufferedAsListChannel (Lkotlin/coroutines/CoroutineContext;)Lcom/splendo/kaluga/base/utils/BufferedAsListChannel;
public static final fun BufferedAsListChannel (Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/ExecutorCoroutineDispatcher;Z)Lcom/splendo/kaluga/base/utils/BufferedAsListChannel;
}

public final class com/splendo/kaluga/base/utils/ByteUtils {
public static final field BYTE_STRING_LENGTH I
public static final field HEX_RADIX I
Expand Down
16 changes: 16 additions & 0 deletions base/api/jvm/base.api
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,22 @@ public abstract class com/splendo/kaluga/base/utils/BaseTimeZone {
public static synthetic fun usesDaylightSavingsTime$default (Lcom/splendo/kaluga/base/utils/BaseTimeZone;Lcom/splendo/kaluga/base/utils/KalugaDate;ILjava/lang/Object;)Z
}

public abstract interface class com/splendo/kaluga/base/utils/BufferedAsListChannel : kotlinx/coroutines/channels/ReceiveChannel, kotlinx/coroutines/channels/SendChannel {
}

public final class com/splendo/kaluga/base/utils/BufferedAsListChannel$DefaultImpls {
public static synthetic fun cancel (Lcom/splendo/kaluga/base/utils/BufferedAsListChannel;)V
public static fun getOnReceiveOrNull (Lcom/splendo/kaluga/base/utils/BufferedAsListChannel;)Lkotlinx/coroutines/selects/SelectClause1;
public static fun offer (Lcom/splendo/kaluga/base/utils/BufferedAsListChannel;Ljava/lang/Object;)Z
public static fun poll (Lcom/splendo/kaluga/base/utils/BufferedAsListChannel;)Ljava/util/List;
public static fun receiveOrNull (Lcom/splendo/kaluga/base/utils/BufferedAsListChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class com/splendo/kaluga/base/utils/BufferedAsListChannelKt {
public static final fun BufferedAsListChannel (Lkotlin/coroutines/CoroutineContext;)Lcom/splendo/kaluga/base/utils/BufferedAsListChannel;
public static final fun BufferedAsListChannel (Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/ExecutorCoroutineDispatcher;Z)Lcom/splendo/kaluga/base/utils/BufferedAsListChannel;
}

public final class com/splendo/kaluga/base/utils/ByteUtils {
public static final field BYTE_STRING_LENGTH I
public static final field HEX_RADIX I
Expand Down
35 changes: 17 additions & 18 deletions base/src/commonMain/kotlin/text/FormatSpecifier.kt
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ internal class FormatSpecifier(private val out: StringBuilder, matchResult: Matc
sb.append(getZero(locale))
}
if (flags.contains(Flag.ZERO_PAD)) {
trailingZeros(sb, width - length)
trailingZeros(sb, width - length, locale)
}
sb.append(valueString)
}
Expand All @@ -235,7 +235,7 @@ internal class FormatSpecifier(private val out: StringBuilder, matchResult: Matc
sb.append(if (uppercase) prefix.upperCased(locale) else prefix)
}
if (flags.contains(Flag.ZERO_PAD)) {
trailingZeros(sb, width - length)
trailingZeros(sb, width - length, locale)
}
sb.append(if (uppercase) hexValue.upperCased(locale) else hexValue)
}
Expand Down Expand Up @@ -302,7 +302,7 @@ internal class FormatSpecifier(private val out: StringBuilder, matchResult: Matc
else -> scientific[1]
}
number.append(mantissa)
addZeros(number, prec)
addZeros(number, prec, locale)

if (flags.contains(Flag.ALTERNATE) && prec == 0)
number.append(formatter.decimalSeparator)
Expand All @@ -326,7 +326,7 @@ internal class FormatSpecifier(private val out: StringBuilder, matchResult: Matc
number.append(formatter.zeroSymbol)
}
number.append(formatter.format(value))
addZeros(number, prec)
addZeros(number, prec, locale)

if (flags.contains(Flag.ALTERNATE) && prec == 0)
number.append(formatter.decimalSeparator)
Expand Down Expand Up @@ -435,14 +435,14 @@ internal class FormatSpecifier(private val out: StringBuilder, matchResult: Matc
DateTime.NAME_OF_DAY_ABBREV, DateTime.NAME_OF_DAY -> {
// 'A'
val i: Int = time.weekDay - 1
val dateFormat = KalugaDateFormatter.patternFormat("EEEE")
val dateFormat = KalugaDateFormatter.patternFormat("EEEE", locale = locale)
val weekdays = if (currentChar.dateTime == DateTime.NAME_OF_DAY) dateFormat.weekdays else dateFormat.shortWeekdays
sb.append(weekdays[i])
}
DateTime.NAME_OF_MONTH_ABBREV, DateTime.NAME_OF_MONTH_ABBREV_X, DateTime.NAME_OF_MONTH -> {
// 'B'
val i: Int = time.month - 1
val dateFormat = KalugaDateFormatter.patternFormat("MMMM")
val dateFormat = KalugaDateFormatter.patternFormat("MMMM", locale = locale)
val months = if (currentChar.dateTime == DateTime.NAME_OF_MONTH) dateFormat.months else dateFormat.shortMonths
sb.append(months[i])
}
Expand Down Expand Up @@ -527,15 +527,16 @@ internal class FormatSpecifier(private val out: StringBuilder, matchResult: Matc
return sb
}

private fun addZeros(sb: StringBuilder, prec: Int) {
private fun addZeros(sb: StringBuilder, prec: Int, locale: KalugaLocale) {

// Look for the dot. If we don't find one, the we'll need to add

// it before we add the zeros.
val decimalSeparator = NumberFormatter(locale).decimalSeparator
val len: Int = sb.length
var i = 0
while (i < len) {
if (sb[i] == '.') {
if (sb[i] == decimalSeparator) {
break
}
i++
Expand All @@ -553,11 +554,11 @@ internal class FormatSpecifier(private val out: StringBuilder, matchResult: Matc

// Add dot if previously determined to be necessary.
if (needDot) {
sb.append('.')
sb.append(decimalSeparator)
}

// Add zeros.
trailingZeros(sb, prec - outPrec)
trailingZeros(sb, prec - outPrec, locale)
}

private fun appendJustified(out: StringBuilder, cs: CharSequence): StringBuilder {
Expand Down Expand Up @@ -620,21 +621,18 @@ internal class FormatSpecifier(private val out: StringBuilder, matchResult: Matc
val zero: Char = getZero(locale)

// determine localized grouping separator and size
val numberFormatter = NumberFormatter(locale)
var grpSep = '\u0000'
var grpSize = -1
var decSep = '\u0000'
val decSep = numberFormatter.decimalSeparator
val len = value.length
var dot = len
for (j in offset until len) {
if (value[j] == '.') {
if (value[j] == decSep) {
dot = j
break
}
}
val numberFormatter = NumberFormatter(locale)
if (dot < len) {
decSep = numberFormatter.decimalSeparator
}
if (flags.contains(Flag.GROUP)) {
grpSep = numberFormatter.groupingSeparator
grpSize = numberFormatter.groupingSize
Expand Down Expand Up @@ -683,9 +681,10 @@ internal class FormatSpecifier(private val out: StringBuilder, matchResult: Matc
// Add trailing zeros

// Add trailing zeros
private fun trailingZeros(sb: StringBuilder, nzeros: Int) {
private fun trailingZeros(sb: StringBuilder, nzeros: Int, locale: KalugaLocale) {
val zeroSymbol = getZero(locale)
for (i in 0 until nzeros) {
sb.append('0')
sb.append(zeroSymbol)
}
}

Expand Down
149 changes: 149 additions & 0 deletions base/src/commonMain/kotlin/utils/BufferedAsListChannel.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright 2023 Splendo Consulting B.V. The Netherlands
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.splendo.kaluga.base.utils

import com.splendo.kaluga.base.singleThreadDispatcher
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CloseableCoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.select
import kotlin.coroutines.CoroutineContext

/**
* A Rendezvous Channel that buffers all elements sent to it in a list until the next receive
* @param T the type of element to batch. Must be non-nullable
*/
interface BufferedAsListChannel<T : Any> : SendChannel<T>, ReceiveChannel<List<T>>

/**
* Creates a [BufferedAsListChannel] that batches its elements with a given [CoroutineContext]
* @param T the type of element to batch. Must be non-nullable
* @param coroutineContext the [CoroutineContext] to use for batching
* @return the [BufferedAsListChannel] created
*/
fun <T : Any> BufferedAsListChannel(
coroutineContext: CoroutineContext
): BufferedAsListChannel<T> = BufferedAsListChannelInt(coroutineContext)

/**
* Creates a [BufferedAsListChannel] that batches its elements with a given [CoroutineContext]
* @param T the type of element to batch. Must be non-nullable
* @param coroutineContext the [CoroutineContext] to use for batching
* @param dispatcher the [CloseableCoroutineDispatcher] to which grouping will be dispatched
* @param closeDispatcherOnCompletion if `true` the [dispatcher] will be closed once the channel is closed
* @return the [BufferedAsListChannel] created
*/
fun <T : Any> BufferedAsListChannel(
coroutineContext: CoroutineContext,
dispatcher: CloseableCoroutineDispatcher,
closeDispatcherOnCompletion: Boolean
): BufferedAsListChannel<T> = BufferedAsListChannelInt(coroutineContext, dispatcher, closeDispatcherOnCompletion)

internal class BufferedAsListChannelInt<T : Any> private constructor(
private val sendChannel: Channel<T>,
private val receiveChannel: Channel<List<T>>,
coroutineContext: CoroutineContext,
dispatcher: CloseableCoroutineDispatcher,
closeDispatcherOnCompletion: Boolean
) : BufferedAsListChannel<T>, SendChannel<T> by sendChannel, ReceiveChannel<List<T>> by receiveChannel {

constructor(
coroutineContext: CoroutineContext,
) : this(coroutineContext, singleThreadDispatcher("GroupingChannel"), true)

constructor(
coroutineContext: CoroutineContext,
dispatcher: CloseableCoroutineDispatcher,
closeDispatcherOnCompletion: Boolean
) : this(
Channel<T>(Channel.UNLIMITED),
Channel<List<T>>(),
coroutineContext,
dispatcher,
closeDispatcherOnCompletion
)

init {
// Dispatch grouping to a separate thread so that the produce/consumption is not blocked by grouping
CoroutineScope(coroutineContext + dispatcher).launch {
do {
val didSendBuffer = bufferUntilNextSend()
} while (didSendBuffer)

// When done buffering (i.e. the send channel is closed and all values have been send to receive channel) we should close the receive channel
receiveChannel.close()
}.invokeOnCompletion {
// Close the dispatcher to prevent thread leaks
if (closeDispatcherOnCompletion) {
dispatcher.close()
}
}
}

private suspend fun bufferUntilNextSend(): Boolean = try {
// Wait until at least one element is send to the send channel
val buffer = mutableListOf(sendChannel.receive())
do {
// Use the select method to either:
// - send the current buffer to the receiveChannel or
// - to add it the next item in sendChannel to the buffer
// Sending will take priority
select {
receiveChannel.onSend(buffer.toList()) {
buffer.clear()
}
// OnReceiveCatching will complete before onSend if the channel is closed
// To account for this, we check whether its closed first
if (!sendChannel.isClosedForReceive) {
sendChannel.onReceiveCatching { result ->
result.getOrNull()?.let {
buffer.add(it)
}
}
}
}
} while (buffer.isNotEmpty())
true
} catch (_: ClosedReceiveChannelException) {
false
}

@Deprecated(
"Since 1.2.0, binary compatibility with versions <= 1.1.x",
level = DeprecationLevel.HIDDEN
)
override fun cancel(cause: Throwable?): Boolean {
sendChannel.cancel(cause as? CancellationException)
receiveChannel.cancel(cause as? CancellationException)
return true
}

override fun cancel(cause: CancellationException?) {
sendChannel.cancel(cause)
receiveChannel.cancel(cause)
}

override fun invokeOnClose(handler: (cause: Throwable?) -> Unit) {
// Receive channel is the last to close, so we should invoke the method there instead of on sendChannel
receiveChannel.invokeOnClose(handler)
}
}
Loading

0 comments on commit 6842f1e

Please sign in to comment.