diff --git a/jobs/src/main/scala/dcos/metronome/scheduler/SchedulerModule.scala b/jobs/src/main/scala/dcos/metronome/scheduler/SchedulerModule.scala index 0620456f..e71c13fc 100644 --- a/jobs/src/main/scala/dcos/metronome/scheduler/SchedulerModule.scala +++ b/jobs/src/main/scala/dcos/metronome/scheduler/SchedulerModule.scala @@ -8,7 +8,7 @@ import akka.actor.{ ActorRefFactory, ActorSystem, Cancellable } import akka.event.EventStream import akka.stream.scaladsl.Source import dcos.metronome.repository.SchedulerRepositoriesModule -import dcos.metronome.scheduler.impl.{ NotifyOfTaskStateOperationStep, PeriodicOperationsImpl, ReconciliationActor, SchedulerServiceImpl } +import dcos.metronome.scheduler.impl.{ NotifyOfTaskStateOperationStep, PeriodicOperationsImpl, ReconciliationActor } import mesosphere.marathon._ import mesosphere.marathon.core.base.{ ActorsModule, CrashStrategy, LifecycleState } import mesosphere.marathon.core.election.{ ElectionModule, ElectionService } @@ -18,9 +18,7 @@ import mesosphere.marathon.core.launcher.{ LauncherModule, OfferProcessor } import mesosphere.marathon.core.launchqueue.LaunchQueueModule import mesosphere.marathon.core.leadership.LeadershipModule import mesosphere.marathon.core.matcher.base.OfferMatcher -import mesosphere.marathon.core.matcher.base.util.StopOnFirstMatchingOfferMatcher import mesosphere.marathon.core.matcher.manager.OfferMatcherManagerModule -import mesosphere.marathon.core.matcher.reconcile.OfferMatcherReconciliationModule import mesosphere.marathon.core.plugin.PluginModule import mesosphere.marathon.core.task.jobs.TaskJobsModule import mesosphere.marathon.core.task.termination.{ KillService, TaskTerminationModule } @@ -30,7 +28,6 @@ import mesosphere.marathon.core.task.update.impl.TaskStatusUpdateProcessorImpl import mesosphere.marathon.core.task.update.impl.steps.ContinueOnErrorStep import mesosphere.marathon.metrics.Metrics import mesosphere.marathon.storage.repository.InstanceRepository -import mesosphere.marathon.stream.EnrichedFlow import mesosphere.util.state._ import scala.concurrent.ExecutionContext @@ -88,23 +85,9 @@ class SchedulerModule( leadershipModule, () => scheduler.getLocalRegion)(actorsModule.materializer) - private[this] lazy val offerMatcherReconcilerModule = - new OfferMatcherReconciliationModule( - scallopConf, - clock, - actorSystem.eventStream, - instanceTrackerModule.instanceTracker, - persistenceModule.groupRepository, - leadershipModule)(actorsModule.materializer) - - lazy val taskStatusProcessor: TaskStatusUpdateProcessor = new TaskStatusUpdateProcessorImpl( - metrics, clock, instanceTrackerModule.instanceTracker, schedulerDriverHolder, killService, eventBus) - private[this] lazy val launcherModule: LauncherModule = { val instanceTracker: InstanceTracker = instanceTrackerModule.instanceTracker - val offerMatcher: OfferMatcher = StopOnFirstMatchingOfferMatcher( - offerMatcherReconcilerModule.offerMatcherReconciler, - offerMatcherManagerModule.globalOfferMatcher) + val offerMatcher: OfferMatcher = offerMatcherManagerModule.globalOfferMatcher new LauncherModule(metrics, scallopConf, instanceTracker, schedulerDriverHolder, offerMatcher, pluginModule.pluginManager)(clock) } @@ -170,11 +153,7 @@ class SchedulerModule( lazy val electionService: ElectionService = electionModule.service /** Combine offersWanted state from multiple sources. */ - private[this] lazy val offersWanted: Source[Boolean, Cancellable] = { - offerMatcherManagerModule.globalOfferMatcherWantsOffers - .via(EnrichedFlow.combineLatest(offerMatcherReconcilerModule.offersWantedObservable, eagerComplete = true)) - .map { case (managerWantsOffers, reconciliationWantsOffers) => managerWantsOffers || reconciliationWantsOffers } - } + private[this] lazy val offersWanted: Source[Boolean, Cancellable] = offerMatcherManagerModule.globalOfferMatcherWantsOffers val launchQueueModule = new LaunchQueueModule( scallopConf,