Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce codecs for instant micros and local timestamp millis #524

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 46 additions & 14 deletions modules/core/src/main/scala/vulcan/Codec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,27 @@

package vulcan

import cats.{Invariant, Show, ~>}
import cats.data.{Chain, NonEmptyChain, NonEmptyList, NonEmptySet, NonEmptyVector}
import cats.data._
import cats.free.FreeApplicative
import cats.implicits._

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.time.{Instant, LocalDate, LocalTime}
import java.util.concurrent.TimeUnit
import java.util.{Arrays, UUID}
import org.apache.avro.{Conversions, LogicalType, LogicalTypes, Schema, SchemaBuilder}
import cats.{Invariant, Show, ~>}
import org.apache.avro.Schema.Type._
import org.apache.avro.generic._
import org.apache.avro.{Conversions, LogicalType, LogicalTypes, Schema, SchemaBuilder}
import vulcan.Avro.Bytes
import vulcan.internal.converters.collection._
import vulcan.internal.schema.adaptForSchema
import vulcan.internal.syntax._
import vulcan.internal.{Deserializer, Serializer}

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.time._
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit._
import java.util.{Arrays, UUID}
import scala.annotation.implicitNotFound
import scala.collection.immutable.SortedSet
import vulcan.internal.converters.collection._
import vulcan.internal.syntax._
import vulcan.internal.schema.adaptForSchema

import scala.util.Try

/**
Expand Down Expand Up @@ -732,6 +731,39 @@ object Codec extends CodecCompanionCompat {
.withLogicalType(LogicalTypes.timestampMillis)
.withTypeName("Instant")

case class InstantMicros(self: Instant)
object InstantMicros {
def ofEpochSecond(epochSecond: Long, nanoAdjustment: Long): InstantMicros =
new InstantMicros(Instant.ofEpochSecond(epochSecond, nanoAdjustment))

def now(): InstantMicros = new InstantMicros(Instant.now())
}

implicit lazy val instantMicros: Codec.Aux[Avro.Long, InstantMicros] =
LongCodec
.imap { microsSinceEpoch =>
InstantMicros.ofEpochSecond(
MICROSECONDS.toSeconds(microsSinceEpoch),
MICROSECONDS.toNanos(Math.floorMod(microsSinceEpoch, SECONDS.toMicros(1)))
)
} { instantMicros =>
NANOSECONDS.toMicros(
SECONDS.toNanos(instantMicros.self.getEpochSecond) + instantMicros.self.getNano
)
}
.withLogicalType(LogicalTypes.timestampMicros)
.withTypeName("InstantMicros")

implicit lazy val localTimestampMillis: Codec.Aux[Avro.Long, LocalDateTime] =
LongCodec
.imap { millisSinceEpoch =>
LocalDateTime.ofInstant(Instant.ofEpochMilli(millisSinceEpoch), ZoneId.of("UTC"))
} { localDateTime =>
localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli
}
.withLogicalType(LogicalTypes.localTimestampMillis)
.withTypeName("LocalDateTime")

/**
* @group General
*/
Expand Down Expand Up @@ -831,7 +863,7 @@ object Codec extends CodecCompanionCompat {
final val localTimeMicros: Codec.Aux[Avro.Long, LocalTime] =
Codec.long
.imap { long =>
val nanos = TimeUnit.MICROSECONDS.toNanos(long)
val nanos = MICROSECONDS.toNanos(long)
LocalTime.ofNanoOfDay(nanos)
}(localTime => TimeUnit.NANOSECONDS.toMicros(localTime.toNanoOfDay))
.withLogicalType(LogicalTypes.timeMicros)
Expand Down
145 changes: 134 additions & 11 deletions modules/core/src/test/scala/vulcan/CodecSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ package vulcan

import cats.data._
import cats.implicits._

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.time.{Instant, LocalDate, LocalTime}
import java.util.concurrent.TimeUnit
import java.time.temporal.ChronoUnit
import java.util.UUID
import org.apache.avro.{Conversions, LogicalTypes, Schema, SchemaBuilder}
import org.apache.avro.generic.GenericData
import org.apache.avro.{Conversions, LogicalTypes, Schema, SchemaBuilder}
import org.scalacheck.Gen
import org.scalatest.Assertion
import vulcan.examples.{SecondInSealedTraitCaseClass, _}
import vulcan.Codec.InstantMicros
import vulcan.examples._
import vulcan.internal.converters.collection._

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.time.temporal.ChronoUnit
import java.time._
import java.util.UUID
import java.util.concurrent.TimeUnit._
import scala.util.{Failure, Success, Try}

final class CodecSpec extends BaseSpec with CodecSpecHelpers {
Expand Down Expand Up @@ -827,6 +827,7 @@ final class CodecSpec extends BaseSpec with CodecSpecHelpers {
val value = {
val instant = Instant.now()
instant.minusNanos(instant.getNano().toLong)
instant
}

assertEncodeIs[Instant](
Expand Down Expand Up @@ -871,6 +872,128 @@ final class CodecSpec extends BaseSpec with CodecSpecHelpers {
}
}

describe("instant micros") {
describe("schema") {
it("should be encoded as long with logical type timestamp-micros") {
assertSchemaIs[InstantMicros] {
"""{"type":"long","logicalType":"timestamp-micros"}"""
}
}
}

describe("encode") {
it("should encode as long") {
val value: InstantMicros = InstantMicros.now()

assertEncodeIs[InstantMicros](
value,
Right(NANOSECONDS.toMicros(SECONDS.toNanos(value.self.getEpochSecond) + value.self.getNano))
)
}
}

describe("decode") {
it("should error if logical type is missing") {
val value = InstantMicros.now()

assertDecodeError[InstantMicros](
unsafeEncode(value),
unsafeSchema[Long],
"Error decoding InstantMicros: Got unexpected missing logical type"
)
}

it("should error if logical type is not timestamp-micros") {
val value = InstantMicros.now()

assertDecodeError[InstantMicros](
unsafeEncode(value), {
LogicalTypes.timestampMillis().addToSchema {
SchemaBuilder.builder().longType()
}
},
"Error decoding InstantMicros: Got unexpected logical type timestamp-millis"
)
}

it("should decode as Instant") {
val value = InstantMicros.now()

assertDecodeIs[InstantMicros](
unsafeEncode(value),
Right(value)
)
}
}
}

describe("local date time") {
describe("schema") {
it("should be encoded as long with logical type local-timestamp-millis") {
assertSchemaIs[LocalDateTime] {
"""{"type":"long","logicalType":"local-timestamp-millis"}"""
}
}
}

describe("encode") {
it("should encode as long") {
val value = {
val instant = Instant.now()
LocalDateTime.ofInstant(instant.minusNanos(instant.getNano.toLong), ZoneId.of("UTC"))
}

assertEncodeIs[LocalDateTime](
value,
Right(value.toInstant(ZoneOffset.UTC).toEpochMilli)
)
}
}

describe("decode") {
it("should error if logical type is missing") {
val value = {
val instant = Instant.now()
LocalDateTime.ofInstant(instant.minusNanos(instant.getNano.toLong), ZoneId.of("UTC"))
}

assertDecodeError[LocalDateTime](
unsafeEncode(value),
unsafeSchema[Long],
"Error decoding LocalDateTime: Got unexpected missing logical type"
)
}

it("should error if logical type is not local-timestamp-millis") {
val value = {
val instant = Instant.now()
LocalDateTime.ofInstant(instant.minusNanos(instant.getNano.toLong), ZoneId.of("UTC"))
}

assertDecodeError[LocalDateTime](
unsafeEncode(value), {
LogicalTypes.timestampMillis().addToSchema {
SchemaBuilder.builder().longType()
}
},
"Error decoding LocalDateTime: Got unexpected logical type timestamp-millis"
)
}

it("should decode as LocalDateTime") {
val value = {
val instant = Instant.now()
LocalDateTime.ofInstant(instant.minusNanos(instant.getNano().toLong), ZoneId.of("UTC"))
}

assertDecodeIs[LocalDateTime](
unsafeEncode(value),
Right(value)
)
}
}
}

describe("imapError") {
case class PosInt(value: Int)

Expand Down Expand Up @@ -1120,7 +1243,7 @@ final class CodecSpec extends BaseSpec with CodecSpecHelpers {
assertEncodeIs[LocalTime](
value,
Right(
java.lang.Integer.valueOf(TimeUnit.NANOSECONDS.toMillis(value.toNanoOfDay()).toInt)
java.lang.Integer.valueOf(NANOSECONDS.toMillis(value.toNanoOfDay()).toInt)
)
)
}
Expand Down Expand Up @@ -1161,7 +1284,7 @@ final class CodecSpec extends BaseSpec with CodecSpecHelpers {
val value = LocalTime.now()
assertEncodeIs[LocalTime](
value,
Right(java.lang.Long.valueOf(TimeUnit.NANOSECONDS.toMicros(value.toNanoOfDay())))
Right(java.lang.Long.valueOf(NANOSECONDS.toMicros(value.toNanoOfDay())))
)
}
}
Expand Down