From ac946d84efa2424214f72d90112fea088ffc3e36 Mon Sep 17 00:00:00 2001 From: SeongChan Lee Date: Thu, 20 Aug 2020 00:58:16 +0900 Subject: [PATCH] Rewrite classifier engine --- Cargo.lock | 11 ++ Cargo.toml | 1 + src/core.rs | 449 +++++++++++++++++++++++++++++++++------------- src/lib.rs | 172 +++++++----------- src/subprocess.rs | 1 + 5 files changed, 405 insertions(+), 229 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c64bac9..79f918d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -133,6 +133,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "crossbeam-channel" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ee0cc8804d5393478d743b035099520087a5186f3b93fa58cec08fa62407b6" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.7.3" @@ -234,6 +244,7 @@ version = "0.4.0-alpha.1" dependencies = [ "anyhow", "clap-v3", + "crossbeam-channel", "dialoguer", "env_logger", "git2", diff --git a/Cargo.toml b/Cargo.toml index 099a2a2..af333a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ anyhow = "1.0.26" glob = "0.3.0" rayon = "1.3.0" thiserror = "1.0" +crossbeam-channel = "0.4.3" man = { version = "0.3.0", optional = true } rson_rs = { version = "0.2.1", optional = true } diff --git a/src/core.rs b/src/core.rs index 50de9ba..d9dcd00 100644 --- a/src/core.rs +++ b/src/core.rs @@ -1,14 +1,16 @@ use std::collections::HashSet; use std::convert::TryFrom; +use std::fmt::Debug; use anyhow::{Context, Result}; +use crossbeam_channel::unbounded; use git2::{BranchType, Repository}; use log::*; use rayon::prelude::*; use crate::args::DeleteFilter; use crate::branch::{LocalBranch, RemoteBranch, RemoteTrackingBranch, RemoteTrackingBranchStatus}; -use crate::merge_tracker::{MergeState, MergeTracker}; +use crate::merge_tracker::MergeTracker; use crate::subprocess::{self, get_worktrees, RemoteHead}; use crate::util::ForceSendSync; use crate::{config, Git}; @@ -240,13 +242,6 @@ impl TrimPlan { } } -pub struct Classification { - pub local: MergeState, - pub fetch: Option>, - pub messages: Vec<&'static str>, - pub result: HashSet, -} - #[derive(Hash, Eq, PartialEq, Debug, Clone)] pub enum ClassifiedBranch { MergedLocal(LocalBranch), @@ -341,134 +336,345 @@ impl ClassifiedBranch { } } -/// Make sure repo and config are semantically Send + Sync. -pub fn classify( - git: ForceSendSync<&Git>, - merge_tracker: &MergeTracker, - remote_heads: &[RemoteHead], - base: &RemoteTrackingBranch, - branch: &LocalBranch, -) -> Result { - let local = merge_tracker.check_and_track(&git.repo, &base.refname, branch)?; - let fetch = if let RemoteTrackingBranchStatus::Exists(fetch) = - branch.fetch_upstream(&git.repo, &git.config)? - { - Some(merge_tracker.check_and_track(&git.repo, &base.refname, &fetch)?) - } else { - None - }; - - let mut c = Classification { - local: local.clone(), - fetch: fetch.clone(), - messages: vec![], - result: HashSet::default(), - }; - - match fetch { - Some(upstream) => { - if local.merged { - if upstream.merged { - c.messages.push("local & fetch upstream are merged"); - c.result - .insert(ClassifiedBranch::MergedLocal(branch.clone())); - c.result - .insert(ClassifiedBranch::MergedRemoteTracking(upstream.branch)); - } else { - c.messages.push("local & fetch upstream are diverged"); - c.result.insert(ClassifiedBranch::DivergedRemoteTracking { - local: branch.clone(), - upstream: upstream.branch, - }); - } - } else if upstream.merged { - c.messages.push("upstream is merged, but the local strays"); - c.result.insert(ClassifiedBranch::Stray(branch.clone())); - c.result - .insert(ClassifiedBranch::MergedRemoteTracking(upstream.branch)); +pub struct Classifier<'a> { + git: &'a Git, + merge_tracker: &'a MergeTracker, + tasks: Vec Result + Send + Sync + 'a>>, +} + +impl<'a> Classifier<'a> { + pub fn new(git: &'a Git, merge_tracker: &'a MergeTracker) -> Self { + Self { + git, + merge_tracker, + tasks: Vec::new(), + } + } + + pub fn queue_request(&mut self, req: R) { + let id = self.tasks.len(); + trace!("Enqueue #{}: {:#?}", id, req); + let git = ForceSendSync::new(self.git); + let merge_tracker = self.merge_tracker; + self.tasks.push(Box::new(move || { + req.classify(git, merge_tracker) + .with_context(|| format!("Failed to classify #{}: {:#?}", id, req)) + .map(|response| ClassificationResponseWithId { id, response }) + })); + } + + pub fn queue_request_with_context< + R: ClassificationRequestWithContext + Send + Sync + Debug + 'a, + C: Send + Sync + 'a, + >( + &mut self, + req: R, + context: C, + ) { + let id = self.tasks.len(); + trace!("Enqueue #{}: {:#?}", id, req); + let git = ForceSendSync::new(self.git); + let merge_tracker = self.merge_tracker; + self.tasks.push(Box::new(move || { + req.classify_with_context(git, merge_tracker, context) + .with_context(|| format!("Failed to classify #{}: {:#?}", id, req)) + .map(|response| ClassificationResponseWithId { id, response }) + })); + } + + pub fn classify(self) -> Result> { + info!("Classify {} requests", self.tasks.len()); + let tasks = self.tasks; + let receiver = rayon::scope(move |scope| { + let (sender, receiver) = unbounded(); + for tasks in tasks { + let sender = sender.clone(); + scope.spawn(move |_| { + let result = tasks(); + sender.send(result).unwrap(); + }) } + receiver + }); + + let mut results = Vec::new(); + for result in receiver { + let ClassificationResponseWithId { id, response } = result?; + debug!("Result #{}: {:#?}", id, response); + + results.push(response); } - // `hub-cli` sets config `branch.{branch_name}.remote` as URL without `remote.{remote}` entry. - // `fetch_upstream` returns None. - // However we can try manual classification without `remote.{remote}` entry. - None => { - let remote = config::get_remote_name(&git.config, branch)? - .expect("should have it if it has an upstream"); - let merge = config::get_merge(&git.config, branch)? - .expect("should have it if it has an upstream"); - let remote_head = remote_heads - .iter() - .find(|h| h.remote == remote && h.refname == merge) - .map(|h| &h.commit); - - match (local.merged, remote_head) { - (true, Some(head)) if head == &local.commit => { - c.messages.push( - "merged local, merged remote: the branch is merged, but forgot to delete", - ); - c.result.insert(ClassifiedBranch::MergedDirectFetch { - local: branch.clone(), - remote: RemoteBranch { - remote, - refname: merge, - }, - }); - } - (true, Some(_)) => { - c.messages.push( - "merged local, diverged upstream: the branch is merged, but upstream is diverged", - ); - c.result.insert(ClassifiedBranch::DivergedDirectFetch { - local: branch.clone(), - remote: RemoteBranch { - remote, - refname: merge, - }, - }); - } - (true, None) => { - c.messages - .push("merged local: the branch is merged, and deleted"); - c.result - .insert(ClassifiedBranch::MergedLocal(branch.clone())); - } - (false, None) => { - c.messages - .push("the branch is not merged but the remote is gone somehow"); - c.result.insert(ClassifiedBranch::Stray(branch.clone())); + Ok(results) + } +} + +struct ClassificationResponseWithId { + id: usize, + response: ClassificationResponse, +} + +#[derive(Debug)] +pub struct ClassificationResponse { + message: &'static str, + pub result: Vec, +} + +pub trait ClassificationRequest { + fn classify( + &self, + git: ForceSendSync<&Git>, + merge_tracker: &MergeTracker, + ) -> Result; +} + +pub trait ClassificationRequestWithContext { + fn classify_with_context( + &self, + git: ForceSendSync<&Git>, + merge_tracker: &MergeTracker, + context: C, + ) -> Result; +} + +#[derive(Debug)] +pub struct TrackingBranchClassificationRequest<'a> { + pub base: &'a RemoteTrackingBranch, + pub local: &'a LocalBranch, + pub upstream: Option<&'a RemoteTrackingBranch>, +} + +impl<'a> ClassificationRequest for TrackingBranchClassificationRequest<'a> { + fn classify( + &self, + git: ForceSendSync<&Git>, + merge_tracker: &MergeTracker, + ) -> Result { + let local = merge_tracker.check_and_track(&git.repo, &self.base.refname, self.local)?; + let upstream = if let Some(upstream) = self.upstream { + merge_tracker.check_and_track(&git.repo, &self.base.refname, upstream)? + } else { + let result = if local.merged { + ClassificationResponse { + message: "local is merged but remote is gone", + result: vec![ClassifiedBranch::MergedLocal(local.branch)], } - (false, _) => { - c.messages.push("skip: the branch is alive"); + } else { + ClassificationResponse { + message: "local is stray but remote is gone", + result: vec![ClassifiedBranch::Stray(local.branch)], } + }; + return Ok(result); + }; + + let result = match (local.merged, upstream.merged) { + (true, true) => ClassificationResponse { + message: "local & upstream are merged", + result: vec![ + ClassifiedBranch::MergedLocal(local.branch), + ClassifiedBranch::MergedRemoteTracking(upstream.branch), + ], + }, + (true, false) => ClassificationResponse { + message: "local is merged but diverged with upstream", + result: vec![ClassifiedBranch::DivergedRemoteTracking { + local: local.branch, + upstream: upstream.branch, + }], + }, + (false, true) => ClassificationResponse { + message: "upstream is merged, but the local strays", + result: vec![ + ClassifiedBranch::Stray(local.branch), + ClassifiedBranch::MergedRemoteTracking(upstream.branch), + ], + }, + (false, false) => ClassificationResponse { + message: "local & upstream are not merged yet", + result: vec![], + }, + }; + + Ok(result) + } +} + +/// `hub-cli` style branch classification request. +/// `hub-cli` sets config `branch.{branch_name}.remote` as URL without `remote.{remote}` entry. +/// However we can try manual classification without `remote.{remote}` entry. +#[derive(Debug)] +pub struct DirectFetchClassificationRequest<'a> { + pub base: &'a RemoteTrackingBranch, + pub local: &'a LocalBranch, + pub remote: &'a RemoteBranch, +} + +impl<'a> ClassificationRequestWithContext<&'a [RemoteHead]> + for DirectFetchClassificationRequest<'a> +{ + fn classify_with_context( + &self, + git: ForceSendSync<&Git>, + merge_tracker: &MergeTracker, + remote_heads: &[RemoteHead], + ) -> Result { + let local = merge_tracker.check_and_track(&git.repo, &self.base.refname, self.local)?; + let remote_head = remote_heads + .iter() + .find(|h| h.remote == self.remote.remote && h.refname == self.remote.refname) + .map(|h| &h.commit); + + let result = match (local.merged, remote_head) { + (true, Some(head)) if head == &local.commit => ClassificationResponse { + message: "local & remote are merged", + result: vec![ClassifiedBranch::MergedDirectFetch { + local: local.branch, + remote: self.remote.clone(), + }], + }, + (true, Some(_)) => ClassificationResponse { + message: "local is merged, but diverged with upstream", + result: vec![ClassifiedBranch::DivergedDirectFetch { + local: local.branch, + remote: self.remote.clone(), + }], + }, + (true, None) => ClassificationResponse { + message: "local is merged and its upstream is gone", + result: vec![ClassifiedBranch::MergedLocal(local.branch)], + }, + (false, None) => ClassificationResponse { + message: "local is not merged but the remote is gone somehow", + result: vec![ClassifiedBranch::Stray(local.branch)], + }, + (false, _) => ClassificationResponse { + message: "local is not merged yet", + result: vec![], + }, + }; + + Ok(result) + } +} + +#[derive(Debug)] +pub struct NonTrackingBranchClassificationRequest<'a> { + pub base: &'a RemoteTrackingBranch, + pub local: &'a LocalBranch, +} + +impl<'a> ClassificationRequest for NonTrackingBranchClassificationRequest<'a> { + fn classify( + &self, + git: ForceSendSync<&Git>, + merge_tracker: &MergeTracker, + ) -> Result { + let local = merge_tracker.check_and_track(&git.repo, &self.base.refname, self.local)?; + let result = if local.merged { + ClassificationResponse { + message: "non-tracking local is merged", + result: vec![ClassifiedBranch::MergedNonTrackingLocal(local.branch)], } - } + } else { + ClassificationResponse { + message: "non-tracking local is not merged", + result: vec![], + } + }; + Ok(result) } +} - Ok(c) +#[derive(Debug)] +pub struct NonUpstreamBranchClassificationRequest<'a> { + pub base: &'a RemoteTrackingBranch, + pub remote: &'a RemoteTrackingBranch, +} + +impl<'a> ClassificationRequest for NonUpstreamBranchClassificationRequest<'a> { + fn classify( + &self, + git: ForceSendSync<&Git>, + merge_tracker: &MergeTracker, + ) -> Result { + let remote = merge_tracker.check_and_track(&git.repo, &self.base.refname, self.remote)?; + let result = if remote.merged { + ClassificationResponse { + message: "non-upstream local is merged", + result: vec![ClassifiedBranch::MergedNonUpstreamRemoteTracking( + remote.branch, + )], + } + } else { + ClassificationResponse { + message: "non-upstream local is not merged", + result: vec![], + } + }; + Ok(result) + } } pub fn get_tracking_branches( git: &Git, base_upstreams: &[RemoteTrackingBranch], -) -> Result> { +) -> Result)>> { let mut result = Vec::new(); for branch in git.repo.branches(Some(BranchType::Local))? { - let branch = LocalBranch::try_from(&branch?.0)?; + let local = LocalBranch::try_from(&branch?.0)?; + + match local.fetch_upstream(&git.repo, &git.config)? { + RemoteTrackingBranchStatus::Exists(upstream) => { + if base_upstreams.contains(&upstream) { + continue; + } + result.push((local, Some(upstream))); + } + RemoteTrackingBranchStatus::Gone(_) => result.push((local, None)), + _ => { + continue; + } + }; + } + + Ok(result) +} + +/// Get `hub-cli` style direct fetched branches +pub fn get_direct_fetch_branches( + git: &Git, + base_refs: &[String], +) -> Result> { + let mut result = Vec::new(); + for branch in git.repo.branches(Some(BranchType::Local))? { + let local = LocalBranch::try_from(&branch?.0)?; - if config::get_remote_name(&git.config, &branch)?.is_none() { + if base_refs.contains(&local.refname) { continue; } - let fetch_upstream = branch.fetch_upstream(&git.repo, &git.config)?; - if let RemoteTrackingBranchStatus::Exists(upstream) = &fetch_upstream { - if base_upstreams.contains(&upstream) { - debug!("Skip: the branch tracks the base: {:?}", branch); - continue; - } + let remote = if let Some(remote) = config::get_remote_name(&git.config, &local)? { + remote + } else { + continue; + }; + + if config::get_remote(&git.repo, &remote)?.is_some() { + continue; } - result.push(branch); + let merge = config::get_merge(&git.config, &local)?.context(format!( + "Should have `branch.{}.merge` entry on git config", + local.short_name() + ))?; + + let remote = RemoteBranch { + remote, + refname: merge, + }; + + result.push((local, remote)); } Ok(result) @@ -509,9 +715,8 @@ pub fn get_non_upstream_remote_tracking_branches( } let tracking_branches = get_tracking_branches(git, base_upstreams)?; - for tracking_branch in tracking_branches { - let upstream = tracking_branch.fetch_upstream(&git.repo, &git.config)?; - if let RemoteTrackingBranchStatus::Exists(upstream) = upstream { + for (_local, upstream) in tracking_branches { + if let Some(upstream) = upstream { upstreams.insert(upstream); } } @@ -535,15 +740,11 @@ pub fn get_non_upstream_remote_tracking_branches( Ok(result) } -pub fn get_remote_heads(git: &Git, branches: &[LocalBranch]) -> Result> { +pub fn get_remote_heads(git: &Git, branches: &[RemoteBranch]) -> Result> { let mut remote_urls = Vec::new(); for branch in branches { - if let Some(remote) = config::get_remote_name(&git.config, &branch)? { - if config::get_remote(&git.repo, &remote)?.is_none() { - remote_urls.push(remote); - } - } + remote_urls.push(&branch.remote); } Ok(remote_urls diff --git a/src/lib.rs b/src/lib.rs index 95b4413..645d448 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,7 +9,6 @@ mod util; use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; -use std::sync::mpsc::channel; use anyhow::{Context, Result}; use git2::{BranchType, Config as GitConfig, Error as GitError, ErrorCode, Repository}; @@ -20,8 +19,10 @@ use crate::args::{DeleteFilter, ScanFilter}; use crate::branch::RemoteTrackingBranchStatus; pub use crate::branch::{LocalBranch, RemoteBranch, RemoteBranchError, RemoteTrackingBranch}; use crate::core::{ - get_non_tracking_local_branches, get_non_upstream_remote_tracking_branches, get_remote_heads, - get_tracking_branches, + get_direct_fetch_branches, get_non_tracking_local_branches, + get_non_upstream_remote_tracking_branches, get_remote_heads, get_tracking_branches, Classifier, + DirectFetchClassificationRequest, NonTrackingBranchClassificationRequest, + NonUpstreamBranchClassificationRequest, TrackingBranchClassificationRequest, }; pub use crate::core::{ClassifiedBranch, TrimPlan}; use crate::merge_tracker::MergeTracker; @@ -57,127 +58,88 @@ pub fn get_trim_plan(git: &Git, param: &PlanParam) -> Result { trace!("base_upstreams: {:#?}", base_upstreams); trace!("protected_refs: {:#?}", protected_refs); - let merge_tracker = MergeTracker::with_base_upstreams(&git.repo, &git.config, &base_upstreams)?; let tracking_branches = get_tracking_branches(git, &base_upstreams)?; + debug!("tracking_branches: {:#?}", tracking_branches); + + let direct_fetch_branches = get_direct_fetch_branches(git, &base_refs)?; + debug!("direct_fetch_branches: {:#?}", direct_fetch_branches); + let non_tracking_branches = get_non_tracking_local_branches(git, &base_refs)?; + debug!("non_tracking_branches: {:#?}", non_tracking_branches); + let non_upstream_branches = get_non_upstream_remote_tracking_branches(git, &base_upstreams)?; + debug!("non_upstream_branches: {:#?}", non_upstream_branches); + let remote_heads = if param.scan.scan_tracking() { - get_remote_heads(git, &tracking_branches)? + let remotes: Vec<_> = direct_fetch_branches + .iter() + .map(|(_, r)| r.clone()) + .collect(); + get_remote_heads(git, &remotes)? } else { Vec::new() }; + debug!("remote_heads: {:#?}", remote_heads); - info!("Start classify:"); - let classifications; - let non_trackings; - let non_upstreams; - { - // git's fields are semantically Send + Sync in the `classify`. - // They are read only in `classify` function. - // It is denoted that it is safe in that case - // https://github.com/libgit2/libgit2/blob/master/docs/threading.md#sharing-objects - let git = ForceSendSync::new(git); - let repo = ForceSendSync::new(&git.repo); - let scan_filter = ¶m.scan; - let merge_tracker = &merge_tracker; - - let base_upstreams = &base_upstreams; - let tracking_branches = &tracking_branches; - let non_tracking_branches = &non_tracking_branches; - let non_upstream_branches = &non_upstream_branches; - let remote_heads = &remote_heads; - - let (classification_tx, classification_rx) = channel(); - let (non_tracking_tx, non_tracking_rx) = channel(); - let (non_upstream_tx, non_upstream_rx) = channel(); - - rayon::scope(move |s| { - for base in base_upstreams { - if scan_filter.scan_tracking() { - for branch in tracking_branches { - let tx = classification_tx.clone(); - s.spawn(move |_| { - let c = core::classify(git, merge_tracker, remote_heads, base, branch) - .with_context(|| { - format!("tracking, base={:?}, branch={:?}", base, branch) - }); - tx.send(c).expect("in scope"); - }); - } - } - - if scan_filter.scan_non_tracking_local() { - for branch in non_tracking_branches { - let tx = non_tracking_tx.clone(); - s.spawn(move |_| { - let result = merge_tracker - .check_and_track(&repo, &base.refname, branch) - .with_context(|| { - format!("non-tracking, base={:?}, branch={:?}", base, branch) - }); - tx.send(result).expect("in scope"); - }) - } - } - - for branch in non_upstream_branches { - match branch.to_remote_branch(&repo) { - Ok(remote_branch) - if !scan_filter.scan_non_upstream_remote(&remote_branch.remote) => - { - continue; - } - _ => {} - } - - let tx = non_upstream_tx.clone(); - s.spawn(move |_| { - let result = merge_tracker - .check_and_track(&repo, &base.refname, branch) - .with_context(|| { - format!("non-upstream, base={:?}, branch={:?}", base, branch) - }); - tx.send(result).expect("in scope"); - }) - } + let merge_tracker = MergeTracker::with_base_upstreams(&git.repo, &git.config, &base_upstreams)?; + let mut classifier = Classifier::new(git, &merge_tracker); + + info!("Enqueue classification requests"); + if param.scan.scan_tracking() { + for (local, upstream) in &tracking_branches { + for base in &base_upstreams { + classifier.queue_request(TrackingBranchClassificationRequest { + base, + local, + upstream: upstream.as_ref(), + }); } - }); - - classifications = classification_rx - .into_iter() - .collect::, _>>()?; - non_trackings = non_tracking_rx.into_iter().collect::, _>>()?; - non_upstreams = non_upstream_rx.into_iter().collect::, _>>()?; - }; + } - let mut delete = HashSet::new(); - for classification in classifications.into_iter() { - debug!("branch: {:?}", classification.local); - trace!("fetch: {:?}", classification.fetch); - debug!("message: {:?}", classification.messages); - delete.extend(classification.result.into_iter()); + for (local, remote) in &direct_fetch_branches { + for base in &base_upstreams { + classifier.queue_request_with_context( + DirectFetchClassificationRequest { + base, + local, + remote, + }, + &remote_heads, + ); + } + } } - for non_tracking in non_trackings.into_iter() { - debug!("non-tracking: {:?}", non_tracking); - if non_tracking.merged { - delete.insert(ClassifiedBranch::MergedNonTrackingLocal( - non_tracking.branch, - )); + + if param.scan.scan_non_tracking_local() { + for base in &base_upstreams { + for local in &non_tracking_branches { + classifier.queue_request(NonTrackingBranchClassificationRequest { base, local }); + } } } - for non_upstream in non_upstreams.into_iter() { - debug!("non-upstream: {:?}", non_upstream); - if non_upstream.merged { - delete.insert(ClassifiedBranch::MergedNonUpstreamRemoteTracking( - non_upstream.branch, - )); + + for base in &base_upstreams { + for remote_tracking in &non_upstream_branches { + let remote = remote_tracking.to_remote_branch(&git.repo)?; + if param.scan.scan_non_upstream_remote(&remote.remote) { + classifier.queue_request(NonUpstreamBranchClassificationRequest { + base, + remote: remote_tracking, + }); + } } } + let classifications = classifier.classify()?; + let mut result = TrimPlan { - to_delete: delete, + to_delete: HashSet::new(), preserved: Vec::new(), }; + for classification in classifications { + result.to_delete.extend(classification.result); + } + let base_and_upstream_refs = resolve_base_and_upstream_refs(&git.repo, &git.config, &base_refs)?; result.preserve(&base_and_upstream_refs, "base")?; diff --git a/src/subprocess.rs b/src/subprocess.rs index 2b0ed83..1320c47 100644 --- a/src/subprocess.rs +++ b/src/subprocess.rs @@ -153,6 +153,7 @@ pub fn get_noff_merged_remotes( Ok(result) } +#[derive(Debug)] pub struct RemoteHead { pub remote: String, pub refname: String,