Skip to content

Commit ebe41e8

Browse files
committed
parallelize log search
* will consume all cores now and got faster in all my benchmarks * setting progress via asyncjob now makes sure to only set it if it has changed and return whether that is the case to simplify sending progress notifications only in case progress actually changed
1 parent 5be397b commit ebe41e8

File tree

6 files changed

+89
-37
lines changed

6 files changed

+89
-37
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111
* fix commit log not updating after branch switch ([#1862](https://github.com/extrawurst/gitui/issues/1862))
1212
* fix stashlist not updating after pop/drop ([#1864](https://github.com/extrawurst/gitui/issues/1864))
1313

14+
### Changed
15+
* log search consumes all cores now and got even faster
16+
1417
## [0.24.1] - 2023-08-30
1518

1619
### Fixes

Cargo.lock

+11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
.PHONY: debug build-release release-linux-musl test clippy clippy-pedantic install install-debug
33

44
ARGS=-l
5+
# ARGS=-l -d ~/code/extern/kubernetes
56
# ARGS=-l -d ~/code/extern/linux
67
# ARGS=-l -d ~/code/git-bare-test.git -w ~/code/git-bare-test
78

asyncgit/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ log = "0.4"
2222
# git2 = { git="https://github.com/extrawurst/git2-rs.git", rev="fc13dcc", features = ["vendored-openssl"]}
2323
# pinning to vendored openssl, using the git2 feature this gets lost with new resolver
2424
openssl-sys = { version = '0.9', features = ["vendored"], optional = true }
25+
rayon = "1.7"
2526
rayon-core = "1.11"
2627
scopetime = { path = "../scopetime", version = "0.1" }
2728
serde = { version = "1.0", features = ["derive"] }

asyncgit/src/asyncjob/mod.rs

+15-6
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,17 @@ use crossbeam_channel::Sender;
77
use std::sync::{Arc, Mutex, RwLock};
88

99
/// Passed to `AsyncJob::run` allowing sending intermediate progress notifications
10-
pub struct RunParams<T: Copy + Send, P: Clone + Send + Sync> {
10+
pub struct RunParams<
11+
T: Copy + Send,
12+
P: Clone + Send + Sync + PartialEq,
13+
> {
1114
sender: Sender<T>,
1215
progress: Arc<RwLock<P>>,
1316
}
1417

15-
impl<T: Copy + Send, P: Clone + Send + Sync> RunParams<T, P> {
18+
impl<T: Copy + Send, P: Clone + Send + Sync + PartialEq>
19+
RunParams<T, P>
20+
{
1621
/// send an intermediate update notification.
1722
/// do not confuse this with the return value of `run`.
1823
/// `send` should only be used about progress notifications
@@ -24,9 +29,13 @@ impl<T: Copy + Send, P: Clone + Send + Sync> RunParams<T, P> {
2429
}
2530

2631
/// set the current progress
27-
pub fn set_progress(&self, p: P) -> Result<()> {
28-
*(self.progress.write()?) = p;
29-
Ok(())
32+
pub fn set_progress(&self, p: P) -> Result<bool> {
33+
Ok(if *self.progress.read()? == p {
34+
false
35+
} else {
36+
*(self.progress.write()?) = p;
37+
true
38+
})
3039
}
3140
}
3241

@@ -35,7 +44,7 @@ pub trait AsyncJob: Send + Sync + Clone {
3544
/// defines what notification type is used to communicate outside
3645
type Notification: Copy + Send;
3746
/// type of progress
38-
type Progress: Clone + Default + Send + Sync;
47+
type Progress: Clone + Default + Send + Sync + PartialEq;
3948

4049
/// can run a synchronous time intensive task.
4150
/// the returned notification is used to tell interested parties

asyncgit/src/filter_commits.rs

+58-31
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1+
use rayon::{
2+
prelude::ParallelIterator,
3+
slice::{ParallelSlice, ParallelSliceMut},
4+
};
5+
16
use crate::{
27
asyncjob::{AsyncJob, RunParams},
38
error::Result,
49
sync::{self, CommitId, RepoPath, SharedCommitFilterFn},
510
AsyncGitNotification, ProgressPercent,
611
};
712
use std::{
8-
sync::{Arc, Mutex},
13+
sync::{atomic::AtomicUsize, Arc, Mutex},
914
time::{Duration, Instant},
1015
};
1116

@@ -69,58 +74,80 @@ impl AsyncCommitFilterJob {
6974
commits: Vec<CommitId>,
7075
params: &RunParams<AsyncGitNotification, ProgressPercent>,
7176
) -> JobState {
72-
let response = sync::repo(repo_path)
73-
.map(|repo| self.filter_commits(&repo, commits, params))
74-
.map(|(start, result)| CommitFilterResult {
75-
result,
76-
duration: start.elapsed(),
77-
});
78-
79-
JobState::Response(response)
77+
let (start, result) =
78+
self.filter_commits(repo_path, commits, params);
79+
80+
//TODO: still need this to be a result?
81+
JobState::Response(Ok(CommitFilterResult {
82+
result,
83+
duration: start.elapsed(),
84+
}))
8085
}
8186

8287
fn filter_commits(
8388
&self,
84-
repo: &git2::Repository,
89+
repo_path: &RepoPath,
8590
commits: Vec<CommitId>,
8691
params: &RunParams<AsyncGitNotification, ProgressPercent>,
8792
) -> (Instant, Vec<CommitId>) {
8893
let total_amount = commits.len();
8994
let start = Instant::now();
9095

91-
let mut progress = ProgressPercent::new(0, total_amount);
92-
93-
let result = commits
96+
let idx = AtomicUsize::new(0);
97+
let mut result = commits
9498
.into_iter()
9599
.enumerate()
96-
.filter_map(|(idx, c)| {
97-
let new_progress =
98-
ProgressPercent::new(idx, total_amount);
99-
100-
if new_progress != progress {
101-
Self::update_progress(params, new_progress);
102-
progress = new_progress;
103-
}
104-
105-
(*self.filter)(repo, &c)
106-
.ok()
107-
.and_then(|res| res.then_some(c))
100+
.collect::<Vec<(usize, CommitId)>>()
101+
.par_chunks(1000)
102+
.filter_map(|c| {
103+
//TODO: error log repo open errors
104+
sync::repo(repo_path).ok().map(|repo| {
105+
c.iter()
106+
.filter_map(|(e, c)| {
107+
let idx = idx.fetch_add(
108+
1,
109+
std::sync::atomic::Ordering::Relaxed,
110+
);
111+
112+
Self::update_progress(
113+
params,
114+
ProgressPercent::new(
115+
idx,
116+
total_amount,
117+
),
118+
);
119+
120+
(*self.filter)(&repo, c).ok().and_then(
121+
|res| res.then_some((*e, *c)),
122+
)
123+
})
124+
.collect::<Vec<_>>()
125+
})
108126
})
127+
.flatten()
109128
.collect::<Vec<_>>();
110129

130+
result.par_sort_by(|a, b| a.0.cmp(&b.0));
131+
132+
let result = result.into_iter().map(|c| c.1).collect();
133+
111134
(start, result)
112135
}
113136

114137
fn update_progress(
115138
params: &RunParams<AsyncGitNotification, ProgressPercent>,
116139
new_progress: ProgressPercent,
117140
) {
118-
if let Err(e) = params.set_progress(new_progress) {
119-
log::error!("progress error: {e}");
120-
} else if let Err(e) =
121-
params.send(AsyncGitNotification::CommitFilter)
122-
{
123-
log::error!("send error: {e}");
141+
match params.set_progress(new_progress) {
142+
Err(e) => log::error!("progress error: {e}"),
143+
Ok(result) if result => {
144+
if let Err(e) =
145+
params.send(AsyncGitNotification::CommitFilter)
146+
{
147+
log::error!("send error: {e}");
148+
}
149+
}
150+
_ => (),
124151
}
125152
}
126153
}

0 commit comments

Comments
 (0)