Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add SchedulerTask which will be notified once cancelled. #1593

Merged
merged 1 commit into from
Dec 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,17 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit
task.isCancelled should ===(true)
}

"notify callback if cancel is performed before execution" taggedAs TimingTest in {
val latch = new CountDownLatch(1)
val task = system.scheduler.scheduleOnce(100 millis,
new SchedulerTask {
override def run(): Unit = ()
override def cancelled(): Unit = latch.countDown()
})
task.cancel()
latch.await(100, TimeUnit.MILLISECONDS) should ===(true)
}

"not be canceled if cancel is performed after execution" taggedAs TimingTest in {
val latch = TestLatch(1)
val task = collectCancellable(system.scheduler.scheduleOnce(10.millis)(latch.countDown()))
Expand Down Expand Up @@ -334,6 +345,23 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit
ticks.get should ===(1)
}

"notify callback if cancel is performed after initial delay" taggedAs TimingTest in {
val latch = new CountDownLatch(1)
val initialDelay = 90.millis.dilated
val delay = 500.millis.dilated
val task = system.scheduler.scheduleWithFixedDelay(
initialDelay,
delay)(
new SchedulerTask {
override def run(): Unit = ()
override def cancelled(): Unit = latch.countDown()
})

Thread.sleep((initialDelay + 200.millis.dilated).toMillis)
task.cancel()
latch.await(100, TimeUnit.MILLISECONDS) should ===(true)
}

/**
* ticket #307
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
final override protected def scheduledFirst(): Cancellable =
schedule(
executor,
new AtomicLong(clock() + initialDelay.toNanos) with Runnable {
new AtomicLong(clock() + initialDelay.toNanos) with SchedulerTask {
override def run(): Unit = {
try {
runnable.run()
Expand All @@ -150,6 +150,11 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
case _: SchedulerException => // ignore failure to enqueue or terminated target actor
}
}

override def cancelled(): Unit = runnable match {
case task: SchedulerTask => task.cancelled()
case _ =>
}
},
roundUp(initialDelay))
}
Expand Down Expand Up @@ -390,7 +395,18 @@ object LightArrayRevolverScheduler {

override def cancel(): Boolean = extractTask(CancelledTask) match {
case ExecutedTask | CancelledTask => false
case _ => true
case task: SchedulerTask =>
notifyCancellation(task)
true
case _ => true
}

private def notifyCancellation(task: SchedulerTask): Unit = {
try {
executionContext.execute(() => task.cancelled())
} catch {
case NonFatal(e) => executionContext.reportFailure(e)
}
}

override def isCancelled: Boolean = task eq CancelledTask
Expand Down
21 changes: 20 additions & 1 deletion actor/src/main/scala/org/apache/pekko/actor/Scheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ trait Scheduler {
final override protected def scheduledFirst(): Cancellable =
scheduleOnce(
initialDelay,
new Runnable {
new SchedulerTask {
override def run(): Unit = {
try {
runnable.run()
Expand All @@ -97,6 +97,11 @@ trait Scheduler {
case e: IllegalStateException if e.getCause != null && e.getCause.isInstanceOf[SchedulerException] =>
}
}

override def cancelled(): Unit = runnable match {
case task: SchedulerTask => task.cancelled()
case _ =>
}
})
}

Expand Down Expand Up @@ -498,6 +503,20 @@ trait Scheduler {
// this one is just here so we can present a nice AbstractScheduler for Java
abstract class AbstractSchedulerBase extends Scheduler

/**
* A Task that will be notified when it is cancelled.
*
* @since 1.2.0
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the since annotation

*/
trait SchedulerTask extends Runnable {

/**
* Called for [[SchedulerTask]]s that are successfully canceled via [[Cancellable#cancel]].
* Overriding this method allows to for example run some cleanup.
*/
def cancelled(): Unit = ()
}

/**
* Signifies something that can be cancelled
* There is no strict guarantee that the implementation is thread-safe,
Expand Down
Loading