fs2-cron is a microlibrary that provides FS2 streams based on Cron4s cron expressions or Calev calendar events.
It is provided for Scala 2.12, 2.13 and fs2-cron-calev
also for
Scala 3.
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import fs2.Stream
import java.time.LocalTime
val printTime = Stream.eval(IO(println(LocalTime.now)))
Requires the fs2-cron-cron4s
module:
import cron4s.Cron
import eu.timepit.fs2cron.cron4s.Cron4sScheduler
val cronScheduler = Cron4sScheduler.systemDefault[IO]
// cronScheduler: eu.timepit.fs2cron.Scheduler[IO, cron4s.expr.CronExpr] = eu.timepit.fs2cron.cron4s.Cron4sScheduler$$anon$1@1d3fe8e4
val evenSeconds = Cron.unsafeParse("*/2 * * ? * *")
// evenSeconds: cron4s.package.CronExpr = CronExpr(
// seconds = */2,
// minutes = *,
// hours = *,
// daysOfMonth = ?,
// months = *,
// daysOfWeek = *
// )
val scheduled = cronScheduler.awakeEvery(evenSeconds) >> printTime
// scheduled: Stream[[x]IO[x], Unit] = Stream(..)
scheduled.take(3).compile.drain.unsafeRunSync()
// 20:47:04.213114
// 20:47:06.002066
// 20:47:08.001635
val everyFiveSeconds = Cron.unsafeParse("*/5 * * ? * *")
// everyFiveSeconds: cron4s.package.CronExpr = CronExpr(
// seconds = */5,
// minutes = *,
// hours = *,
// daysOfMonth = ?,
// months = *,
// daysOfWeek = *
// )
val scheduledTasks = cronScheduler.schedule(List(
evenSeconds -> Stream.eval(IO(println(LocalTime.now.toString + " task 1"))),
everyFiveSeconds -> Stream.eval(IO(println(LocalTime.now.toString + " task 2")))
))
// scheduledTasks: Stream[IO, Unit] = Stream(..)
scheduledTasks.take(9).compile.drain.unsafeRunSync()
// 20:47:10.002946 task 2
// 20:47:10.002965 task 1
// 20:47:12.001202 task 1
// 20:47:14.002239 task 1
// 20:47:15.001423 task 2
// 20:47:16.001854 task 1
// 20:47:18.002236 task 1
// 20:47:20.001258 task 2
// 20:47:20.001890 task 1
Using Stream#interruptWhen(haltWhenTrue)
import cats.effect._
import cron4s.Cron
import eu.timepit.fs2cron.cron4s.Cron4sScheduler
import fs2.Stream
import fs2.concurrent.SignallingRef
import java.time.LocalTime
import scala.concurrent.duration._
object TestApp extends IOApp.Simple {
val printTime = Stream.eval(IO(println(LocalTime.now)))
override def run: IO[Unit] = {
val cronScheduler = Cron4sScheduler.systemDefault[IO]
val evenSeconds = Cron.unsafeParse("*/2 * * ? * *")
val scheduled = cronScheduler.awakeEvery(evenSeconds) >> printTime
val cancel = SignallingRef[IO, Boolean](false)
for {
c <- cancel
s <- scheduled.interruptWhen(c).repeat.compile.drain.start
//prints about 5 times before stop
_ <- Temporal[IO].sleep(10.seconds) >> c.set(true)
} yield s
}
}
Requires the fs2-cron-calev
module:
import com.github.eikek.calev.CalEvent
import eu.timepit.fs2cron.calev.CalevScheduler
val calevScheduler = CalevScheduler.systemDefault[IO]
// calevScheduler: eu.timepit.fs2cron.Scheduler[IO, CalEvent] = eu.timepit.fs2cron.calev.CalevScheduler$$anon$1@75005f8c
val oddSeconds = CalEvent.unsafe("*-*-* *:*:1/2")
// oddSeconds: CalEvent = CalEvent(
// weekday = All,
// date = DateEvent(year = All, month = All, day = All),
// time = TimeEvent(
// hour = All,
// minute = All,
// seconds = List(values = Vector(Single(value = 1, rep = Some(value = 2))))
// ),
// zone = None
// )
val calevScheduled = calevScheduler.awakeEvery(oddSeconds) >> printTime
// calevScheduled: Stream[[x]IO[x], Unit] = Stream(..)
calevScheduled.take(3).compile.drain.unsafeRunSync()
// 20:47:21.009230
// 20:47:23.000383
// 20:47:25.000540
val everyFourSeconds = CalEvent.unsafe("*-*-* *:*:0/4")
// everyFourSeconds: CalEvent = CalEvent(
// weekday = All,
// date = DateEvent(year = All, month = All, day = All),
// time = TimeEvent(
// hour = All,
// minute = All,
// seconds = List(values = Vector(Single(value = 0, rep = Some(value = 4))))
// ),
// zone = None
// )
val calevScheduledTasks = calevScheduler.schedule(List(
oddSeconds -> Stream.eval(IO(println(LocalTime.now.toString + " task 1"))),
everyFourSeconds -> Stream.eval(IO(println(LocalTime.now.toString + " task 2")))
))
// calevScheduledTasks: Stream[IO, Unit] = Stream(..)
calevScheduledTasks.take(9).compile.drain.unsafeRunSync()
// 20:47:27.001415 task 1
// 20:47:28.000200 task 2
// 20:47:29.000647 task 1
// 20:47:31.001694 task 1
// 20:47:32.001308 task 2
// 20:47:33.000669 task 1
// 20:47:35.001209 task 1
// 20:47:36.001160 task 2
// 20:47:37.000646 task 1
The latest version of the library is available for Scala 2.12 and 2.13.
If you're using sbt, add the following to your build:
libraryDependencies ++= Seq(
"eu.timepit" %% "fs2-cron-cron4s" % "0.8.3" //and/or
"eu.timepit" %% "fs2-cron-calev" % "0.8.3"
)
fs2-cron is licensed under the Apache License, Version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 and also in the LICENSE file.