Skip to content

Commit

Permalink
[IOSP-164] Multiple Merge Queues (one per target branch) (#39)
Browse files Browse the repository at this point in the history
* Introduce DispatchService, clean MergeService

* Move PullRequest.isLabelled extension

* Temp disable HealthCheck Tests

* Remove .starting state, pass initial PRs on init

* Restore starting state, make tests pass

* Restore a .starting state with empty queue for consistency

* Replace MonoQueueDispatchServiceMock with DispatchService directly

* onNewMergeService callback + propagate scheduler

* Fix dispatch of StatusEvents

* Fix test logic

Was dispatching the GH event while the newly-spawn MergeService was still .starting. Passing the PR directly on starting instead to fix it.

* Assert on MergeService creation & branch dispatch

* Move Reducers right after Feedbacks for readability

* Restore HealthCheck Services

* Remove a MergeService once it's idle

* Moving declaration of State before its extension

Because apparently on Linux declaration order matters while on macOS the compiler is OK with arbitrary order…

* Fix Linux Build

* [Test] Make state reactive to avoid race condition

* [Test] Switching from collect to append

* Revert "[Test] Switching from collect to append"

This reverts commit b8d5d84.

* Add missing observe(on:)

* Remove useless @escaping

* Refactor lifecycle signal to emit MS states too

* Cleanup

* Adding DispatchServiceTests

And extracting common helpers from MergeServiceTests

* More reliable Test for parallel events

* [Test] use sendPullRequestEvent for readability

And // with sendStatusEvent

* Making the queueStates JSON reply parsable easily

to match @ilyapuchka's suggestion for the ClientApp API

* Add queue name in the PR comment when PR queued

* Apply suggestions from code review

Co-Authored-By: Ilya Puchka <[email protected]>

* Comment about skipRepeats() usage on tuple

* Fix queueStates JSON output + Test

* [Test] use sendPullRequestEvent for readability

Missed some in previous commit

* Stop injecting the Observer into DispatchService

Instead expose the signal as a public prop

* adjust parsing to expect array of queues instead of single queue

* do not ignore client app project

* Make mergeServices dict Atomic

* Make DispatchService.healthcheck on-demand

Simplifies from Reactive code to on-demand property because that's the only use case we have rn anyway.
Allows us to avoid the odd trick of disposing and recreating a SignalProducer or having to handle thread-safety of the mergeServices dict

* Avoid side-effects in Atomic.modify

* Reorder declarations to fit our code style
  • Loading branch information
Olivier Halligon authored Dec 3, 2019
1 parent 04625a6 commit 8057711
Show file tree
Hide file tree
Showing 19 changed files with 1,559 additions and 553 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ Packages
.swiftpm
xcuserdata
*.xcodeproj
!WallEView.xcodeproj
Config/secrets
.DS_Store
1 change: 0 additions & 1 deletion Sources/App/Extensions/EnvironmentProperties.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ extension Environment {
return PullRequest.Label(name: try Environment.get("MERGE_LABEL"))
}

// TODO: OHA: Add this value to env vars in host
static func topPriorityLabels() throws -> [PullRequest.Label] {
let labelsList: String = try Environment.get("TOP_PRIORITY_LABELS")
return labelsList.split(separator: ",").map { name in
Expand Down
12 changes: 6 additions & 6 deletions Sources/App/configure.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ public func configure(_ config: inout Config, _ env: inout Environment, _ servic

logger.log("👟 Starting up...")

let mergeService = try makeMergeService(with: logger, gitHubEventsService)
let dispatchService = try makeDispatchService(with: logger, gitHubEventsService)

services.register(mergeService)
services.register(dispatchService)
services.register(logger, as: PrintLogger.self)
services.register(RequestLoggerMiddleware.self)

// Register routes to the router
let router = EngineRouter.default()
try routes(router, logger: logger, mergeService: mergeService, gitHubEventsService: gitHubEventsService)
try routes(router, logger: logger, dispatchService: dispatchService, gitHubEventsService: gitHubEventsService)
services.register(router, as: Router.self)

// Register middleware
Expand All @@ -34,12 +34,12 @@ public func configure(_ config: inout Config, _ env: inout Environment, _ servic
logger.log("🏁 Ready")
}

private func makeMergeService(with logger: LoggerProtocol, _ gitHubEventsService: GitHubEventsService) throws -> MergeService {
private func makeDispatchService(with logger: LoggerProtocol, _ gitHubEventsService: GitHubEventsService) throws -> DispatchService {

let gitHubAPI = GitHubClient(session: URLSession(configuration: .default), token: try Environment.gitHubToken())
.api(for: Repository(owner: try Environment.gitHubOrganization(), name: try Environment.gitHubRepository()))

return MergeService(
return DispatchService(
integrationLabel: try Environment.mergeLabel(),
topPriorityLabels: try Environment.topPriorityLabels(),
requiresAllStatusChecks: try Environment.requiresAllGitHubStatusChecks(),
Expand All @@ -50,4 +50,4 @@ private func makeMergeService(with logger: LoggerProtocol, _ gitHubEventsService
)
}

extension MergeService: Service {}
extension DispatchService: Service {}
8 changes: 4 additions & 4 deletions Sources/App/routes.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@ import Vapor
public func routes(
_ router: Router,
logger: LoggerProtocol,
mergeService: MergeService,
dispatchService: DispatchService,
gitHubEventsService: GitHubEventsService
) throws {

router.get("/") { request -> Response in
let response = Response(using: request)
if request.header(named: HTTPHeaderName.accept.description) == "application/json" {
try response.content.encode(mergeService.state.value, as: .json)
try response.content.encode(dispatchService.queueStates, as: .json)
} else {
try response.content.encode(String(describing: mergeService.state.value), as: .plainText)
try response.content.encode(dispatchService.queuesDescription, as: .plainText)
}
return response
}

router.get("health") { request -> HTTPResponse in
switch mergeService.healthcheck.status.value {
switch dispatchService.healthcheckStatus {
case .ok: return HTTPResponse(status: .ok)
default: return HTTPResponse(status: .serviceUnavailable)
}
Expand Down
9 changes: 9 additions & 0 deletions Sources/Bot/Models/PullRequest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,12 @@ extension PullRequest.Branch: CustomDebugStringConvertible {
return "Branch(\(ref), \(sha))"
}
}

extension PullRequest {
func isLabelled(with label: PullRequest.Label) -> Bool {
return labels.contains(label)
}
func isLabelled(withOneOf possibleLabels: [PullRequest.Label]) -> Bool {
return labels.contains(where: possibleLabels.contains)
}
}
181 changes: 181 additions & 0 deletions Sources/Bot/Services/DispatchService.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import Foundation
import Result
import ReactiveSwift
import ReactiveFeedback

/// Orchestrates multiple merge services, one per each target branch of PRs enqueued for integration
public final class DispatchService {
private let integrationLabel: PullRequest.Label
private let topPriorityLabels: [PullRequest.Label]
private let requiresAllStatusChecks: Bool
private let statusChecksTimeout: TimeInterval

private let logger: LoggerProtocol
private let gitHubAPI: GitHubAPIProtocol
private let scheduler: DateScheduler

/// Merge services per target branch
private var mergeServices: Atomic<[String: MergeService]>
public let mergeServiceLifecycle: Signal<DispatchService.MergeServiceLifecycleEvent, NoError>
private let mergeServiceLifecycleObserver: Signal<DispatchService.MergeServiceLifecycleEvent, NoError>.Observer

public init(
integrationLabel: PullRequest.Label,
topPriorityLabels: [PullRequest.Label],
requiresAllStatusChecks: Bool,
statusChecksTimeout: TimeInterval,
logger: LoggerProtocol,
gitHubAPI: GitHubAPIProtocol,
gitHubEvents: GitHubEventsServiceProtocol,
scheduler: DateScheduler = QueueScheduler()
) {
self.integrationLabel = integrationLabel
self.topPriorityLabels = topPriorityLabels
self.requiresAllStatusChecks = requiresAllStatusChecks
self.statusChecksTimeout = statusChecksTimeout

self.logger = logger
self.gitHubAPI = gitHubAPI
self.scheduler = scheduler

self.mergeServices = Atomic([:])
(mergeServiceLifecycle, mergeServiceLifecycleObserver) = Signal<DispatchService.MergeServiceLifecycleEvent, NoError>.pipe()

gitHubAPI.fetchPullRequests()
.flatMapError { _ in .value([]) }
.map { pullRequests in
pullRequests.filter { $0.isLabelled(with: self.integrationLabel) }
}
.observe(on: scheduler)
.startWithValues { pullRequests in
self.dispatchInitial(pullRequests: pullRequests)
}

gitHubEvents.events
.observe(on: scheduler)
.observeValues { [weak self] gitHubEvent in
switch gitHubEvent {
case let .pullRequest(event):
self?.pullRequestDidChange(event: event)
case let .status(event):
self?.statusChecksDidChange(event: event)
case .ping:
break
}
}
}

private func dispatchInitial(pullRequests: [PullRequest]) {
let dispatchTable = Dictionary(grouping: pullRequests) { $0.target.ref }
mergeServices.modify { dict in
for (branch, pullRequestsForBranch) in dispatchTable {
dict[branch] = makeMergeService(
targetBranch: branch,
scheduler: self.scheduler,
initialPullRequests: pullRequestsForBranch
)
}
}
}

private func pullRequestDidChange(event: PullRequestEvent) {
logger.log("📣 Pull Request did change \(event.pullRequestMetadata) with action `\(event.action)`")
let targetBranch = event.pullRequestMetadata.reference.target.ref
let existingService = mergeServices.modify { (dict: inout [String: MergeService]) -> MergeService? in
if let service = dict[targetBranch] {
// If service was already existing, return it so we'll send the pullRequestChangesObserver event outside this `modify` below
return service
} else {
dict[targetBranch] = makeMergeService(
targetBranch: targetBranch,
scheduler: self.scheduler,
initialPullRequests: [event.pullRequestMetadata.reference]
)
// If MergeService didn't exist yet and we just created it, return nil so that we DON'T send the event on pullRequestChangesObserver
// outside this `modify` below (because we already passed the PR to initialPullRequests parameters when creating the service – and
// the service would still be `.starting` and it would not be ready to receive those events anyway)
return nil
}
}
existingService?.pullRequestChangesObserver.send(value: (event.pullRequestMetadata, event.action))
}

private func statusChecksDidChange(event: StatusEvent) {
// No way to know which MergeService this event is supposed to be for – isRelative(toBranch:) only checks for head branch not target so not useful here
// So we're sending it to all MergeServices, and they'll filter them themselves based on their own queues
mergeServices.withValue { currentMergeServices in
for mergeServiceForBranch in currentMergeServices.values {
mergeServiceForBranch.statusChecksCompletionObserver.send(value: event)
}
}
}

private func makeMergeService(targetBranch: String, scheduler: DateScheduler, initialPullRequests: [PullRequest] = []) -> MergeService {
let mergeService = MergeService(
targetBranch: targetBranch,
integrationLabel: integrationLabel,
topPriorityLabels: topPriorityLabels,
requiresAllStatusChecks: requiresAllStatusChecks,
statusChecksTimeout: statusChecksTimeout,
initialPullRequests: initialPullRequests,
logger: logger,
gitHubAPI: gitHubAPI,
scheduler: scheduler
)
mergeServiceLifecycleObserver.send(value: .created(mergeService))
mergeService.state.producer
.observe(on: scheduler)
.startWithValues { [weak self, service = mergeService] state in
self?.mergeServiceLifecycleObserver.send(value: .stateChanged(service))
if state.status == .idle {
self?.mergeServices.modify { dict in
dict[targetBranch] = nil
}
self?.mergeServiceLifecycleObserver.send(value: .destroyed(service))
}
}

return mergeService
}
}

extension DispatchService {
public enum MergeServiceLifecycleEvent {
case created(MergeService)
case destroyed(MergeService)
case stateChanged(MergeService)
}
}

extension DispatchService {
public var queuesDescription: String {
let currentMergeServices = mergeServices.value
guard !currentMergeServices.isEmpty else {
return "No PR pending, all queues empty."
}
return currentMergeServices.map { (entry: (key: String, value: MergeService)) -> String in
"""
## Merge Queue for target branch: \(entry.key) ##
\(entry.value.state.value)
"""
}.joined(separator: "\n\n")
}

public var queueStates: [MergeService.State] {
return self.mergeServices.value.values
.map { $0.state.value }
.sorted { (lhs, rhs) in
lhs.targetBranch < rhs.targetBranch
}
}
}

// MARK: - Healthcheck

extension DispatchService {
public var healthcheckStatus: MergeService.Healthcheck.Status {
let currentStatuses = self.mergeServices.value.values.map { $0.healthcheck.status.value }
return currentStatuses.first(where: { $0 != .ok }) ?? .ok
}
}
Loading

0 comments on commit 8057711

Please sign in to comment.