@@ -8,202 +8,202 @@ use std::sync::{Arc, Mutex};
8
8
9
9
/// trait that defines an async task we can run on a threadpool
10
10
pub trait AsyncJob : Send + Sync + Clone {
11
- /// can run a synchronous time intensive task
12
- fn run ( & mut self ) ;
11
+ /// can run a synchronous time intensive task
12
+ fn run ( & mut self ) ;
13
13
}
14
14
15
15
/// Abstraction for a FIFO task queue that will only queue up **one** `next` job.
16
16
/// It keeps overwriting the next job until it is actually taken to be processed
17
17
#[ derive( Debug , Clone ) ]
18
18
pub struct AsyncSingleJob < J : AsyncJob , T : Copy + Send + ' static > {
19
- next : Arc < Mutex < Option < J > > > ,
20
- last : Arc < Mutex < Option < J > > > ,
21
- sender : Sender < T > ,
22
- pending : Arc < Mutex < ( ) > > ,
23
- notification : T ,
19
+ next : Arc < Mutex < Option < J > > > ,
20
+ last : Arc < Mutex < Option < J > > > ,
21
+ sender : Sender < T > ,
22
+ pending : Arc < Mutex < ( ) > > ,
23
+ notification : T ,
24
24
}
25
25
26
26
impl < J : ' static + AsyncJob , T : Copy + Send + ' static >
27
- AsyncSingleJob < J , T >
27
+ AsyncSingleJob < J , T >
28
28
{
29
- ///
30
- pub fn new ( sender : Sender < T > , value : T ) -> Self {
31
- Self {
32
- next : Arc :: new ( Mutex :: new ( None ) ) ,
33
- last : Arc :: new ( Mutex :: new ( None ) ) ,
34
- pending : Arc :: new ( Mutex :: new ( ( ) ) ) ,
35
- notification : value,
36
- sender,
37
- }
38
- }
39
-
40
- ///
41
- pub fn is_pending ( & self ) -> bool {
42
- self . pending . try_lock ( ) . is_err ( )
43
- }
44
-
45
- /// makes sure `next` is cleared and returns `true` if it actually canceled something
46
- pub fn cancel ( & mut self ) -> bool {
47
- if let Ok ( mut next) = self . next . lock ( ) {
48
- if next. is_some ( ) {
49
- * next = None ;
50
- return true ;
51
- }
52
- }
53
-
54
- false
55
- }
56
-
57
- /// take out last finished job
58
- pub fn take_last ( & self ) -> Option < J > {
59
- if let Ok ( mut last) = self . last . lock ( ) {
60
- last. take ( )
61
- } else {
62
- None
63
- }
64
- }
65
-
66
- /// spawns `task` if nothing is running currently, otherwise schedules as `next` overwriting if `next` was set before
67
- pub fn spawn ( & mut self , task : J ) -> bool {
68
- self . schedule_next ( task) ;
69
- self . check_for_job ( )
70
- }
71
-
72
- fn check_for_job ( & self ) -> bool {
73
- if self . is_pending ( ) {
74
- return false ;
75
- }
76
-
77
- if let Some ( task) = self . take_next ( ) {
78
- let self_arc = self . clone ( ) ;
79
-
80
- rayon_core:: spawn ( move || {
81
- if let Err ( e) = self_arc. run_job ( task) {
82
- log:: error!( "async job error: {}" , e) ;
83
- }
84
- } ) ;
85
-
86
- return true ;
87
- }
88
-
89
- false
90
- }
91
-
92
- fn run_job ( & self , mut task : J ) -> Result < ( ) > {
93
- //limit the pending scope
94
- {
95
- let _pending = self . pending . lock ( ) ?;
96
-
97
- task. run ( ) ;
98
-
99
- if let Ok ( mut last) = self . last . lock ( ) {
100
- * last = Some ( task) ;
101
- }
102
-
103
- self . sender . send ( self . notification ) ?;
104
- }
105
-
106
- self . check_for_job ( ) ;
107
-
108
- Ok ( ( ) )
109
- }
110
-
111
- fn schedule_next ( & mut self , task : J ) {
112
- if let Ok ( mut next) = self . next . lock ( ) {
113
- * next = Some ( task) ;
114
- }
115
- }
116
-
117
- fn take_next ( & self ) -> Option < J > {
118
- if let Ok ( mut next) = self . next . lock ( ) {
119
- next. take ( )
120
- } else {
121
- None
122
- }
123
- }
29
+ ///
30
+ pub fn new ( sender : Sender < T > , value : T ) -> Self {
31
+ Self {
32
+ next : Arc :: new ( Mutex :: new ( None ) ) ,
33
+ last : Arc :: new ( Mutex :: new ( None ) ) ,
34
+ pending : Arc :: new ( Mutex :: new ( ( ) ) ) ,
35
+ notification : value,
36
+ sender,
37
+ }
38
+ }
39
+
40
+ ///
41
+ pub fn is_pending ( & self ) -> bool {
42
+ self . pending . try_lock ( ) . is_err ( )
43
+ }
44
+
45
+ /// makes sure `next` is cleared and returns `true` if it actually canceled something
46
+ pub fn cancel ( & mut self ) -> bool {
47
+ if let Ok ( mut next) = self . next . lock ( ) {
48
+ if next. is_some ( ) {
49
+ * next = None ;
50
+ return true ;
51
+ }
52
+ }
53
+
54
+ false
55
+ }
56
+
57
+ /// take out last finished job
58
+ pub fn take_last ( & self ) -> Option < J > {
59
+ if let Ok ( mut last) = self . last . lock ( ) {
60
+ last. take ( )
61
+ } else {
62
+ None
63
+ }
64
+ }
65
+
66
+ /// spawns `task` if nothing is running currently, otherwise schedules as `next` overwriting if `next` was set before
67
+ pub fn spawn ( & mut self , task : J ) -> bool {
68
+ self . schedule_next ( task) ;
69
+ self . check_for_job ( )
70
+ }
71
+
72
+ fn check_for_job ( & self ) -> bool {
73
+ if self . is_pending ( ) {
74
+ return false ;
75
+ }
76
+
77
+ if let Some ( task) = self . take_next ( ) {
78
+ let self_arc = self . clone ( ) ;
79
+
80
+ rayon_core:: spawn ( move || {
81
+ if let Err ( e) = self_arc. run_job ( task) {
82
+ log:: error!( "async job error: {}" , e) ;
83
+ }
84
+ } ) ;
85
+
86
+ return true ;
87
+ }
88
+
89
+ false
90
+ }
91
+
92
+ fn run_job ( & self , mut task : J ) -> Result < ( ) > {
93
+ //limit the pending scope
94
+ {
95
+ let _pending = self . pending . lock ( ) ?;
96
+
97
+ task. run ( ) ;
98
+
99
+ if let Ok ( mut last) = self . last . lock ( ) {
100
+ * last = Some ( task) ;
101
+ }
102
+
103
+ self . sender . send ( self . notification ) ?;
104
+ }
105
+
106
+ self . check_for_job ( ) ;
107
+
108
+ Ok ( ( ) )
109
+ }
110
+
111
+ fn schedule_next ( & mut self , task : J ) {
112
+ if let Ok ( mut next) = self . next . lock ( ) {
113
+ * next = Some ( task) ;
114
+ }
115
+ }
116
+
117
+ fn take_next ( & self ) -> Option < J > {
118
+ if let Ok ( mut next) = self . next . lock ( ) {
119
+ next. take ( )
120
+ } else {
121
+ None
122
+ }
123
+ }
124
124
}
125
125
126
126
#[ cfg( test) ]
127
127
mod test {
128
- use super :: * ;
129
- use crossbeam_channel:: unbounded;
130
- use pretty_assertions:: assert_eq;
131
- use std:: {
132
- sync:: atomic:: AtomicU32 , thread:: sleep, time:: Duration ,
133
- } ;
134
-
135
- #[ derive( Clone ) ]
136
- struct TestJob {
137
- v : Arc < AtomicU32 > ,
138
- value_to_add : u32 ,
139
- }
140
-
141
- impl AsyncJob for TestJob {
142
- fn run ( & mut self ) {
143
- sleep ( Duration :: from_millis ( 100 ) ) ;
144
-
145
- self . v . fetch_add (
146
- self . value_to_add ,
147
- std:: sync:: atomic:: Ordering :: Relaxed ,
148
- ) ;
149
- }
150
- }
151
-
152
- type Notificaton = ( ) ;
153
-
154
- #[ test]
155
- fn test_overwrite ( ) {
156
- let ( sender, receiver) = unbounded ( ) ;
157
-
158
- let mut job: AsyncSingleJob < TestJob , Notificaton > =
159
- AsyncSingleJob :: new ( sender, ( ) ) ;
160
-
161
- let task = TestJob {
162
- v : Arc :: new ( AtomicU32 :: new ( 1 ) ) ,
163
- value_to_add : 1 ,
164
- } ;
165
-
166
- assert ! ( job. spawn( task. clone( ) ) ) ;
167
- sleep ( Duration :: from_millis ( 1 ) ) ;
168
- for _ in 0 ..5 {
169
- assert ! ( !job. spawn( task. clone( ) ) ) ;
170
- }
171
-
172
- let _foo = receiver. recv ( ) . unwrap ( ) ;
173
- let _foo = receiver. recv ( ) . unwrap ( ) ;
174
- assert ! ( receiver. is_empty( ) ) ;
175
-
176
- assert_eq ! (
177
- task. v. load( std:: sync:: atomic:: Ordering :: Relaxed ) ,
178
- 3
179
- ) ;
180
- }
181
-
182
- #[ test]
183
- fn test_cancel ( ) {
184
- let ( sender, receiver) = unbounded ( ) ;
185
-
186
- let mut job: AsyncSingleJob < TestJob , Notificaton > =
187
- AsyncSingleJob :: new ( sender, ( ) ) ;
188
-
189
- let task = TestJob {
190
- v : Arc :: new ( AtomicU32 :: new ( 1 ) ) ,
191
- value_to_add : 1 ,
192
- } ;
193
-
194
- assert ! ( job. spawn( task. clone( ) ) ) ;
195
- sleep ( Duration :: from_millis ( 1 ) ) ;
196
-
197
- for _ in 0 ..5 {
198
- assert ! ( !job. spawn( task. clone( ) ) ) ;
199
- }
200
- assert ! ( job. cancel( ) ) ;
201
-
202
- let _foo = receiver. recv ( ) . unwrap ( ) ;
203
-
204
- assert_eq ! (
205
- task. v. load( std:: sync:: atomic:: Ordering :: Relaxed ) ,
206
- 2
207
- ) ;
208
- }
128
+ use super :: * ;
129
+ use crossbeam_channel:: unbounded;
130
+ use pretty_assertions:: assert_eq;
131
+ use std:: {
132
+ sync:: atomic:: AtomicU32 , thread:: sleep, time:: Duration ,
133
+ } ;
134
+
135
+ #[ derive( Clone ) ]
136
+ struct TestJob {
137
+ v : Arc < AtomicU32 > ,
138
+ value_to_add : u32 ,
139
+ }
140
+
141
+ impl AsyncJob for TestJob {
142
+ fn run ( & mut self ) {
143
+ sleep ( Duration :: from_millis ( 100 ) ) ;
144
+
145
+ self . v . fetch_add (
146
+ self . value_to_add ,
147
+ std:: sync:: atomic:: Ordering :: Relaxed ,
148
+ ) ;
149
+ }
150
+ }
151
+
152
+ type Notificaton = ( ) ;
153
+
154
+ #[ test]
155
+ fn test_overwrite ( ) {
156
+ let ( sender, receiver) = unbounded ( ) ;
157
+
158
+ let mut job: AsyncSingleJob < TestJob , Notificaton > =
159
+ AsyncSingleJob :: new ( sender, ( ) ) ;
160
+
161
+ let task = TestJob {
162
+ v : Arc :: new ( AtomicU32 :: new ( 1 ) ) ,
163
+ value_to_add : 1 ,
164
+ } ;
165
+
166
+ assert ! ( job. spawn( task. clone( ) ) ) ;
167
+ sleep ( Duration :: from_millis ( 1 ) ) ;
168
+ for _ in 0 ..5 {
169
+ assert ! ( !job. spawn( task. clone( ) ) ) ;
170
+ }
171
+
172
+ let _foo = receiver. recv ( ) . unwrap ( ) ;
173
+ let _foo = receiver. recv ( ) . unwrap ( ) ;
174
+ assert ! ( receiver. is_empty( ) ) ;
175
+
176
+ assert_eq ! (
177
+ task. v. load( std:: sync:: atomic:: Ordering :: Relaxed ) ,
178
+ 3
179
+ ) ;
180
+ }
181
+
182
+ #[ test]
183
+ fn test_cancel ( ) {
184
+ let ( sender, receiver) = unbounded ( ) ;
185
+
186
+ let mut job: AsyncSingleJob < TestJob , Notificaton > =
187
+ AsyncSingleJob :: new ( sender, ( ) ) ;
188
+
189
+ let task = TestJob {
190
+ v : Arc :: new ( AtomicU32 :: new ( 1 ) ) ,
191
+ value_to_add : 1 ,
192
+ } ;
193
+
194
+ assert ! ( job. spawn( task. clone( ) ) ) ;
195
+ sleep ( Duration :: from_millis ( 1 ) ) ;
196
+
197
+ for _ in 0 ..5 {
198
+ assert ! ( !job. spawn( task. clone( ) ) ) ;
199
+ }
200
+ assert ! ( job. cancel( ) ) ;
201
+
202
+ let _foo = receiver. recv ( ) . unwrap ( ) ;
203
+
204
+ assert_eq ! (
205
+ task. v. load( std:: sync:: atomic:: Ordering :: Relaxed ) ,
206
+ 2
207
+ ) ;
208
+ }
209
209
}
0 commit comments