1
- use rayon:: {
2
- prelude:: ParallelIterator ,
3
- slice:: { ParallelSlice , ParallelSliceMut } ,
4
- } ;
5
-
6
1
use crate :: {
7
2
asyncjob:: { AsyncJob , RunParams } ,
8
3
error:: Result ,
9
4
sync:: { self , CommitId , RepoPath , SharedCommitFilterFn } ,
10
5
AsyncGitNotification , ProgressPercent ,
11
6
} ;
12
7
use std:: {
13
- sync:: { atomic :: AtomicUsize , Arc , Mutex } ,
8
+ sync:: { Arc , Mutex } ,
14
9
time:: { Duration , Instant } ,
15
10
} ;
16
11
@@ -74,62 +69,44 @@ impl AsyncCommitFilterJob {
74
69
commits : Vec < CommitId > ,
75
70
params : & RunParams < AsyncGitNotification , ProgressPercent > ,
76
71
) -> JobState {
77
- let ( start , result ) =
78
- self . filter_commits ( repo_path , commits, params) ;
79
-
80
- //TODO: still need this to be a result?
81
- JobState :: Response ( Ok ( CommitFilterResult {
82
- result ,
83
- duration : start . elapsed ( ) ,
84
- } ) )
72
+ let response = sync :: repo ( repo_path )
73
+ . map ( |repo| self . filter_commits ( & repo , commits, params) )
74
+ . map ( | ( start , result ) | CommitFilterResult {
75
+ result,
76
+ duration : start . elapsed ( ) ,
77
+ } ) ;
78
+
79
+ JobState :: Response ( response )
85
80
}
86
81
87
82
fn filter_commits (
88
83
& self ,
89
- repo_path : & RepoPath ,
84
+ repo : & git2 :: Repository ,
90
85
commits : Vec < CommitId > ,
91
86
params : & RunParams < AsyncGitNotification , ProgressPercent > ,
92
87
) -> ( Instant , Vec < CommitId > ) {
93
88
let total_amount = commits. len ( ) ;
94
89
let start = Instant :: now ( ) ;
95
90
96
- let idx = AtomicUsize :: new ( 0 ) ;
97
- let mut result = commits
91
+ let mut progress = ProgressPercent :: new ( 0 , total_amount) ;
92
+
93
+ let result = commits
98
94
. into_iter ( )
99
95
. enumerate ( )
100
- . collect :: < Vec < ( usize , CommitId ) > > ( )
101
- . par_chunks ( 1000 )
102
- . filter_map ( |c| {
103
- //TODO: error log repo open errors
104
- sync:: repo ( repo_path) . ok ( ) . map ( |repo| {
105
- c. iter ( )
106
- . filter_map ( |( e, c) | {
107
- let idx = idx. fetch_add (
108
- 1 ,
109
- std:: sync:: atomic:: Ordering :: Relaxed ,
110
- ) ;
111
-
112
- Self :: update_progress (
113
- params,
114
- ProgressPercent :: new (
115
- idx,
116
- total_amount,
117
- ) ,
118
- ) ;
119
-
120
- ( * self . filter ) ( & repo, c) . ok ( ) . and_then (
121
- |res| res. then_some ( ( * e, * c) ) ,
122
- )
123
- } )
124
- . collect :: < Vec < _ > > ( )
125
- } )
126
- } )
127
- . flatten ( )
128
- . collect :: < Vec < _ > > ( ) ;
96
+ . filter_map ( |( idx, c) | {
97
+ let new_progress =
98
+ ProgressPercent :: new ( idx, total_amount) ;
129
99
130
- result. par_sort_by ( |a, b| a. 0 . cmp ( & b. 0 ) ) ;
100
+ if new_progress != progress {
101
+ Self :: update_progress ( params, new_progress) ;
102
+ progress = new_progress;
103
+ }
131
104
132
- let result = result. into_iter ( ) . map ( |c| c. 1 ) . collect ( ) ;
105
+ ( * self . filter ) ( repo, & c)
106
+ . ok ( )
107
+ . and_then ( |res| res. then_some ( c) )
108
+ } )
109
+ . collect :: < Vec < _ > > ( ) ;
133
110
134
111
( start, result)
135
112
}
@@ -138,16 +115,12 @@ impl AsyncCommitFilterJob {
138
115
params : & RunParams < AsyncGitNotification , ProgressPercent > ,
139
116
new_progress : ProgressPercent ,
140
117
) {
141
- match params. set_progress ( new_progress) {
142
- Err ( e) => log:: error!( "progress error: {e}" ) ,
143
- Ok ( result) if result => {
144
- if let Err ( e) =
145
- params. send ( AsyncGitNotification :: CommitFilter )
146
- {
147
- log:: error!( "send error: {e}" ) ;
148
- }
149
- }
150
- _ => ( ) ,
118
+ if let Err ( e) = params. set_progress ( new_progress) {
119
+ log:: error!( "progress error: {e}" ) ;
120
+ } else if let Err ( e) =
121
+ params. send ( AsyncGitNotification :: CommitFilter )
122
+ {
123
+ log:: error!( "send error: {e}" ) ;
151
124
}
152
125
}
153
126
}
0 commit comments