diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt index f7c7eb9407..2a1b70b9b6 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt @@ -1,11 +1,14 @@ package org.jetbrains.kotlinx.dataframe.io +import kotlinx.datetime.Instant import kotlinx.datetime.LocalDate import kotlinx.datetime.LocalDateTime import kotlinx.datetime.LocalTime +import kotlinx.datetime.TimeZone import kotlinx.datetime.toKotlinLocalDate import kotlinx.datetime.toKotlinLocalDateTime import kotlinx.datetime.toKotlinLocalTime +import kotlinx.datetime.toLocalDateTime import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.BigIntVector import org.apache.arrow.vector.BitVector @@ -25,9 +28,13 @@ import org.apache.arrow.vector.TimeMicroVector import org.apache.arrow.vector.TimeMilliVector import org.apache.arrow.vector.TimeNanoVector import org.apache.arrow.vector.TimeSecVector +import org.apache.arrow.vector.TimeStampMicroTZVector import org.apache.arrow.vector.TimeStampMicroVector +import org.apache.arrow.vector.TimeStampMilliTZVector import org.apache.arrow.vector.TimeStampMilliVector +import org.apache.arrow.vector.TimeStampNanoTZVector import org.apache.arrow.vector.TimeStampNanoVector +import org.apache.arrow.vector.TimeStampSecTZVector import org.apache.arrow.vector.TimeStampSecVector import org.apache.arrow.vector.TinyIntVector import org.apache.arrow.vector.UInt1Vector @@ -63,6 +70,7 @@ import java.math.BigDecimal import java.math.BigInteger import java.nio.channels.ReadableByteChannel import java.nio.channels.SeekableByteChannel +import java.util.concurrent.TimeUnit import kotlin.reflect.KType import kotlin.reflect.full.withNullability import kotlin.reflect.typeOf @@ -170,6 +178,16 @@ private fun TimeStampNanoVector.values(range: IntRange): List<LocalDateTime?> = } } +private fun TimeStampNanoTZVector.values(range: IntRange): List<LocalDateTime?> = + range.mapIndexed { i, it -> + if (isNull(i)) { + null + } else { + DateUtility.getLocalDateTimeFromEpochNano(getObject(it), this.timeZone) + .toKotlinLocalDateTime() + } + } + private fun TimeStampMicroVector.values(range: IntRange): List<LocalDateTime?> = range.mapIndexed { i, it -> if (isNull(i)) { @@ -179,6 +197,16 @@ private fun TimeStampMicroVector.values(range: IntRange): List<LocalDateTime?> = } } +private fun TimeStampMicroTZVector.values(range: IntRange): List<LocalDateTime?> = + range.mapIndexed { i, it -> + if (isNull(i)) { + null + } else { + DateUtility.getLocalDateTimeFromEpochMicro(getObject(it), this.timeZone) + .toKotlinLocalDateTime() + } + } + private fun TimeStampMilliVector.values(range: IntRange): List<LocalDateTime?> = range.mapIndexed { i, it -> if (isNull(i)) { @@ -188,6 +216,16 @@ private fun TimeStampMilliVector.values(range: IntRange): List<LocalDateTime?> = } } +private fun TimeStampMilliTZVector.values(range: IntRange): List<LocalDateTime?> = + range.mapIndexed { i, it -> + if (isNull(i)) { + null + } else { + Instant.fromEpochMilliseconds(getObject(it)) + .toLocalDateTime(TimeZone.of(this.timeZone)) + } + } + private fun TimeStampSecVector.values(range: IntRange): List<LocalDateTime?> = range.mapIndexed { i, it -> if (isNull(i)) { @@ -197,6 +235,16 @@ private fun TimeStampSecVector.values(range: IntRange): List<LocalDateTime?> = } } +private fun TimeStampSecTZVector.values(range: IntRange): List<LocalDateTime?> = + range.mapIndexed { i, it -> + if (isNull(i)) { + null + } else { + DateUtility.getLocalDateTimeFromEpochMilli(TimeUnit.SECONDS.toMillis(getObject(it)), this.timeZone) + .toKotlinLocalDateTime() + } + } + private fun StructVector.values(range: IntRange): List<Map<String, Any?>?> = range.map { getObject(it) @@ -343,12 +391,20 @@ private fun readField(root: VectorSchemaRoot, field: Field, nullability: Nullabi is TimeStampNanoVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) + is TimeStampNanoTZVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) + is TimeStampMicroVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) + is TimeStampMicroTZVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) + is TimeStampMilliVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) + is TimeStampMilliTZVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) + is TimeStampSecVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) + is TimeStampSecTZVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) + is StructVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) is NullVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) diff --git a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt index 78356f4fed..95108db052 100644 --- a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt +++ b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt @@ -9,9 +9,13 @@ import kotlinx.datetime.UtcOffset import kotlinx.datetime.toInstant import kotlinx.datetime.toJavaInstant import org.apache.arrow.memory.RootAllocator +import org.apache.arrow.vector.TimeStampMicroTZVector import org.apache.arrow.vector.TimeStampMicroVector +import org.apache.arrow.vector.TimeStampMilliTZVector import org.apache.arrow.vector.TimeStampMilliVector +import org.apache.arrow.vector.TimeStampNanoTZVector import org.apache.arrow.vector.TimeStampNanoVector +import org.apache.arrow.vector.TimeStampSecTZVector import org.apache.arrow.vector.TimeStampSecVector import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.ipc.ArrowFileReader @@ -532,9 +536,13 @@ internal class ArrowKtTest { val dataFrame = dataFrameOf( "ts_nano" to dates, + "ts_nano_tz" to dates, "ts_micro" to dates, + "ts_micro_tz" to dates, "ts_milli" to dates, + "ts_milli_tz" to dates, "ts_sec" to dates, + "ts_sec_tz" to dates, ) DataFrame.readArrowFeather(writeArrowTimestamp(dates)) shouldBe dataFrame @@ -549,42 +557,79 @@ internal class ArrowKtTest { null, ) + val timeStampMilliTZ = Field( + "ts_milli_tz", + FieldType.nullable(ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")), + null, + ) + val timeStampMicro = Field( "ts_micro", FieldType.nullable(ArrowType.Timestamp(TimeUnit.MICROSECOND, null)), null, ) + val timeStampMicroTZ = Field( + "ts_micro_tz", + FieldType.nullable(ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")), + null, + ) + val timeStampNano = Field( "ts_nano", FieldType.nullable(ArrowType.Timestamp(TimeUnit.NANOSECOND, null)), null, ) + val timeStampNanoTZ = Field( + "ts_nano_tz", + FieldType.nullable(ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC")), + null, + ) + val timeStampSec = Field( "ts_sec", FieldType.nullable(ArrowType.Timestamp(TimeUnit.SECOND, null)), null, ) + + val timeStampSecTZ = Field( + "ts_sec_tz", + FieldType.nullable(ArrowType.Timestamp(TimeUnit.SECOND, "UTC")), + null, + ) + val schemaTimeStamp = Schema( - listOf(timeStampNano, timeStampMicro, timeStampMilli, timeStampSec), + listOf(timeStampNano, timeStampNanoTZ, timeStampMicro, timeStampMicroTZ, timeStampMilli, timeStampMilliTZ, timeStampSec, timeStampSecTZ), ) VectorSchemaRoot.create(schemaTimeStamp, allocator).use { vectorSchemaRoot -> val timeStampMilliVector = vectorSchemaRoot.getVector("ts_milli") as TimeStampMilliVector + val timeStampMilliTZVector = vectorSchemaRoot.getVector("ts_milli_tz") as TimeStampMilliTZVector val timeStampNanoVector = vectorSchemaRoot.getVector("ts_nano") as TimeStampNanoVector + val timeStampNanoTZVector = vectorSchemaRoot.getVector("ts_nano_tz") as TimeStampNanoTZVector val timeStampMicroVector = vectorSchemaRoot.getVector("ts_micro") as TimeStampMicroVector + val timeStampMicroTZVector = vectorSchemaRoot.getVector("ts_micro_tz") as TimeStampMicroTZVector val timeStampSecVector = vectorSchemaRoot.getVector("ts_sec") as TimeStampSecVector + val timeStampSecTZVector = vectorSchemaRoot.getVector("ts_sec_tz") as TimeStampSecTZVector timeStampMilliVector.allocateNew(dates.size) + timeStampMilliTZVector.allocateNew(dates.size) timeStampNanoVector.allocateNew(dates.size) + timeStampNanoTZVector.allocateNew(dates.size) timeStampMicroVector.allocateNew(dates.size) + timeStampMicroTZVector.allocateNew(dates.size) timeStampSecVector.allocateNew(dates.size) + timeStampSecTZVector.allocateNew(dates.size) dates.forEachIndexed { index, localDateTime -> val instant = localDateTime.toInstant(UtcOffset.ZERO).toJavaInstant() timeStampNanoVector[index] = instant.toEpochMilli() * 1_000_000L + instant.nano + timeStampNanoTZVector[index] = instant.toEpochMilli() * 1_000_000L + instant.nano timeStampMicroVector[index] = instant.toEpochMilli() * 1_000L + timeStampMicroTZVector[index] = instant.toEpochMilli() * 1_000L timeStampMilliVector[index] = instant.toEpochMilli() + timeStampMilliTZVector[index] = instant.toEpochMilli() timeStampSecVector[index] = instant.toEpochMilli() / 1_000L + timeStampSecTZVector[index] = instant.toEpochMilli() / 1_000L } vectorSchemaRoot.setRowCount(dates.size) val bos = ByteArrayOutputStream()