Skip to content

Commit

Permalink
Fix metronome not reviving when starting new job (#343)
Browse files Browse the repository at this point in the history
Offer matcher reconciler was not run with .start() so the source was emitting no value and combineLatest needs at least one value to publish ANY result.

We don't need OfferMatcherReconciler at all since we don't use PVs.

JIRA issues: DCOS_OSS-5166
  • Loading branch information
alenkacz authored and kensipe committed Jul 1, 2019
1 parent f526afb commit 58d6dd7
Showing 1 changed file with 3 additions and 24 deletions.
27 changes: 3 additions & 24 deletions jobs/src/main/scala/dcos/metronome/scheduler/SchedulerModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import akka.actor.{ ActorRefFactory, ActorSystem, Cancellable }
import akka.event.EventStream
import akka.stream.scaladsl.Source
import dcos.metronome.repository.SchedulerRepositoriesModule
import dcos.metronome.scheduler.impl.{ NotifyOfTaskStateOperationStep, PeriodicOperationsImpl, ReconciliationActor, SchedulerServiceImpl }
import dcos.metronome.scheduler.impl.{ NotifyOfTaskStateOperationStep, PeriodicOperationsImpl, ReconciliationActor }
import mesosphere.marathon._
import mesosphere.marathon.core.base.{ ActorsModule, CrashStrategy, LifecycleState }
import mesosphere.marathon.core.election.{ ElectionModule, ElectionService }
Expand All @@ -18,9 +18,7 @@ import mesosphere.marathon.core.launcher.{ LauncherModule, OfferProcessor }
import mesosphere.marathon.core.launchqueue.LaunchQueueModule
import mesosphere.marathon.core.leadership.LeadershipModule
import mesosphere.marathon.core.matcher.base.OfferMatcher
import mesosphere.marathon.core.matcher.base.util.StopOnFirstMatchingOfferMatcher
import mesosphere.marathon.core.matcher.manager.OfferMatcherManagerModule
import mesosphere.marathon.core.matcher.reconcile.OfferMatcherReconciliationModule
import mesosphere.marathon.core.plugin.PluginModule
import mesosphere.marathon.core.task.jobs.TaskJobsModule
import mesosphere.marathon.core.task.termination.{ KillService, TaskTerminationModule }
Expand All @@ -30,7 +28,6 @@ import mesosphere.marathon.core.task.update.impl.TaskStatusUpdateProcessorImpl
import mesosphere.marathon.core.task.update.impl.steps.ContinueOnErrorStep
import mesosphere.marathon.metrics.Metrics
import mesosphere.marathon.storage.repository.InstanceRepository
import mesosphere.marathon.stream.EnrichedFlow
import mesosphere.util.state._

import scala.concurrent.ExecutionContext
Expand Down Expand Up @@ -88,23 +85,9 @@ class SchedulerModule(
leadershipModule,
() => scheduler.getLocalRegion)(actorsModule.materializer)

private[this] lazy val offerMatcherReconcilerModule =
new OfferMatcherReconciliationModule(
scallopConf,
clock,
actorSystem.eventStream,
instanceTrackerModule.instanceTracker,
persistenceModule.groupRepository,
leadershipModule)(actorsModule.materializer)

lazy val taskStatusProcessor: TaskStatusUpdateProcessor = new TaskStatusUpdateProcessorImpl(
metrics, clock, instanceTrackerModule.instanceTracker, schedulerDriverHolder, killService, eventBus)

private[this] lazy val launcherModule: LauncherModule = {
val instanceTracker: InstanceTracker = instanceTrackerModule.instanceTracker
val offerMatcher: OfferMatcher = StopOnFirstMatchingOfferMatcher(
offerMatcherReconcilerModule.offerMatcherReconciler,
offerMatcherManagerModule.globalOfferMatcher)
val offerMatcher: OfferMatcher = offerMatcherManagerModule.globalOfferMatcher

new LauncherModule(metrics, scallopConf, instanceTracker, schedulerDriverHolder, offerMatcher, pluginModule.pluginManager)(clock)
}
Expand Down Expand Up @@ -170,11 +153,7 @@ class SchedulerModule(
lazy val electionService: ElectionService = electionModule.service

/** Combine offersWanted state from multiple sources. */
private[this] lazy val offersWanted: Source[Boolean, Cancellable] = {
offerMatcherManagerModule.globalOfferMatcherWantsOffers
.via(EnrichedFlow.combineLatest(offerMatcherReconcilerModule.offersWantedObservable, eagerComplete = true))
.map { case (managerWantsOffers, reconciliationWantsOffers) => managerWantsOffers || reconciliationWantsOffers }
}
private[this] lazy val offersWanted: Source[Boolean, Cancellable] = offerMatcherManagerModule.globalOfferMatcherWantsOffers

val launchQueueModule = new LaunchQueueModule(
scallopConf,
Expand Down

0 comments on commit 58d6dd7

Please sign in to comment.