1
1
//! In-memory implementation of TaskStorage trait. The storage allows tasks to be
2
2
//! pushed to and popped from a queue, and also allows tasks to be set and
3
3
//! retrieved by their UUID.
4
- use crate :: queue:: { TaskQueue , TaskQueueError } ;
5
4
use crate :: task:: { Task , TaskId } ;
5
+ use crate :: { TaskQueue , TaskQueueError , TaskSerializer } ;
6
6
use async_trait:: async_trait;
7
7
use serde:: de:: DeserializeOwned ;
8
8
use serde:: Serialize ;
9
9
use std:: collections:: { HashMap , VecDeque } ;
10
10
use std:: marker:: PhantomData ;
11
11
use std:: sync:: Mutex ;
12
12
13
- pub struct InMemoryTaskQueue < D > {
14
- pub hashmap : Mutex < HashMap < TaskId , String > > ,
13
+ pub struct InMemoryTaskQueue < D , S >
14
+ where
15
+ S : TaskSerializer ,
16
+ {
17
+ pub hashmap : Mutex < HashMap < TaskId , Vec < u8 > > > ,
15
18
pub list : Mutex < VecDeque < TaskId > > ,
16
- pub dlq : Mutex < HashMap < TaskId , String > > ,
17
- _marker : PhantomData < D > ,
19
+ pub dlq : Mutex < HashMap < TaskId , Vec < u8 > > > ,
20
+ _marker : PhantomData < ( D , S ) > ,
18
21
}
19
22
20
- impl < D > InMemoryTaskQueue < D > {
23
+ impl < D , S > InMemoryTaskQueue < D , S >
24
+ where
25
+ S : TaskSerializer ,
26
+ {
21
27
pub fn new ( ) -> Self {
22
28
Self {
23
29
hashmap : Mutex :: new ( HashMap :: new ( ) ) ,
@@ -28,14 +34,17 @@ impl<D> InMemoryTaskQueue<D> {
28
34
}
29
35
}
30
36
31
- impl < D > Default for InMemoryTaskQueue < D > {
37
+ impl < D , S > Default for InMemoryTaskQueue < D , S >
38
+ where
39
+ S : TaskSerializer ,
40
+ {
32
41
fn default ( ) -> Self {
33
42
Self :: new ( )
34
43
}
35
44
}
36
45
37
46
#[ async_trait]
38
- impl < D > TaskQueue < D > for InMemoryTaskQueue < D >
47
+ impl < D , S > TaskQueue < D > for InMemoryTaskQueue < D , S >
39
48
where
40
49
D : std:: fmt:: Debug
41
50
+ Clone
44
53
+ Send
45
54
+ Sync
46
55
+ ' static ,
56
+ S : TaskSerializer + Send + Sync ,
47
57
{
48
58
async fn push ( & self , task : & Task < D > ) -> Result < ( ) , TaskQueueError > {
49
59
let mut list = self
55
65
. lock ( )
56
66
. map_err ( |e| TaskQueueError :: QueueError ( e. to_string ( ) ) ) ?;
57
67
58
- let task_value = serde_json:: to_string ( task)
59
- . map_err ( |e| TaskQueueError :: SerdeError ( e. to_string ( ) ) ) ?;
60
- hashmap. insert ( task. task_id , task_value) ;
68
+ let task_bytes = S :: serialize_task ( task) ?;
69
+ hashmap. insert ( task. task_id , task_bytes) ;
61
70
list. push_back ( task. task_id ) ;
62
71
Ok ( ( ) )
63
72
}
@@ -73,12 +82,10 @@ where
73
82
. map_err ( |e| TaskQueueError :: QueueError ( e. to_string ( ) ) ) ?;
74
83
75
84
if let Some ( task_id) = list. pop_front ( ) {
76
- let task_value = hashmap
85
+ let task_bytes = hashmap
77
86
. get ( & task_id)
78
87
. ok_or ( TaskQueueError :: TaskNotFound ( task_id) ) ?;
79
- let task: Task < D > = serde_json:: from_str ( task_value)
80
- . map_err ( |e| TaskQueueError :: SerdeError ( e. to_string ( ) ) ) ?;
81
- Ok ( task)
88
+ S :: deserialize_task ( task_bytes)
82
89
} else {
83
90
Err ( TaskQueueError :: QueueEmpty )
84
91
}
@@ -100,9 +107,8 @@ where
100
107
. dlq
101
108
. lock ( )
102
109
. map_err ( |e| TaskQueueError :: QueueError ( e. to_string ( ) ) ) ?;
103
- let task_value = serde_json:: to_string ( task)
104
- . map_err ( |e| TaskQueueError :: SerdeError ( e. to_string ( ) ) ) ?;
105
- dlq. insert ( task. task_id , task_value) ;
110
+ let task_bytes = S :: serialize_task ( task) ?;
111
+ dlq. insert ( task. task_id , task_bytes) ;
106
112
107
113
let mut hashmap = self
108
114
. hashmap
@@ -111,7 +117,6 @@ where
111
117
hashmap
112
118
. remove ( & task. task_id )
113
119
. ok_or ( TaskQueueError :: TaskNotFound ( task. task_id ) ) ?;
114
-
115
120
Ok ( ( ) )
116
121
}
117
122
@@ -120,29 +125,33 @@ where
120
125
. hashmap
121
126
. lock ( )
122
127
. map_err ( |e| TaskQueueError :: QueueError ( e. to_string ( ) ) ) ?;
123
- let task_value = serde_json:: to_string ( task)
124
- . map_err ( |e| TaskQueueError :: SerdeError ( e. to_string ( ) ) ) ?;
125
- hashmap. insert ( task. task_id , task_value) ;
128
+ let task_bytes = S :: serialize_task ( task) ?;
129
+ hashmap. insert ( task. task_id , task_bytes) ;
126
130
Ok ( ( ) )
127
131
}
128
132
}
129
133
130
- impl < D > std:: fmt:: Debug for InMemoryTaskQueue < D > {
134
+ impl < D , S > std:: fmt:: Debug for InMemoryTaskQueue < D , S >
135
+ where
136
+ S : TaskSerializer ,
137
+ {
131
138
fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
132
139
let hashmap = self . hashmap . lock ( ) . unwrap ( ) ;
133
140
let list = self . list . lock ( ) . unwrap ( ) ;
134
141
let dlq = self . dlq . lock ( ) . unwrap ( ) ;
135
142
136
143
f. debug_struct ( "InMemoryTaskQueue" )
137
- . field ( "hashmap " , & * hashmap)
144
+ . field ( "hashmap_size " , & hashmap. len ( ) )
138
145
. field ( "list" , & * list)
139
- . field ( "dlq " , & * dlq)
146
+ . field ( "dlq_size " , & dlq. len ( ) )
140
147
. finish ( )
141
148
}
142
149
}
150
+
143
151
#[ cfg( test) ]
144
152
mod tests {
145
153
use super :: * ;
154
+ use crate :: JsonSerializer ;
146
155
use serde:: { Deserialize , Serialize } ;
147
156
148
157
#[ derive( Debug , Clone , Serialize , Deserialize , PartialEq ) ]
@@ -152,7 +161,7 @@ mod tests {
152
161
153
162
#[ tokio:: test]
154
163
async fn test_push_and_pop ( ) {
155
- let queue = InMemoryTaskQueue :: < TestData > :: new ( ) ;
164
+ let queue = InMemoryTaskQueue :: < TestData , JsonSerializer > :: new ( ) ;
156
165
let task = Task :: new ( TestData { value : 42 } ) ;
157
166
158
167
queue. push ( & task) . await . unwrap ( ) ;
@@ -163,7 +172,7 @@ mod tests {
163
172
164
173
#[ tokio:: test]
165
174
async fn test_queue_empty ( ) {
166
- let queue = InMemoryTaskQueue :: < TestData > :: new ( ) ;
175
+ let queue = InMemoryTaskQueue :: < TestData , JsonSerializer > :: new ( ) ;
167
176
168
177
match queue. pop ( ) . await {
169
178
Err ( TaskQueueError :: QueueEmpty ) => ( ) ,
@@ -173,7 +182,7 @@ mod tests {
173
182
174
183
#[ tokio:: test]
175
184
async fn test_ack ( ) {
176
- let queue = InMemoryTaskQueue :: < TestData > :: new ( ) ;
185
+ let queue = InMemoryTaskQueue :: < TestData , JsonSerializer > :: new ( ) ;
177
186
let task = Task :: new ( TestData { value : 42 } ) ;
178
187
179
188
queue. push ( & task) . await . unwrap ( ) ;
@@ -186,7 +195,7 @@ mod tests {
186
195
187
196
#[ tokio:: test]
188
197
async fn test_nack ( ) {
189
- let queue = InMemoryTaskQueue :: < TestData > :: new ( ) ;
198
+ let queue = InMemoryTaskQueue :: < TestData , JsonSerializer > :: new ( ) ;
190
199
let task = Task :: new ( TestData { value : 42 } ) ;
191
200
192
201
queue. push ( & task) . await . unwrap ( ) ;
@@ -203,7 +212,7 @@ mod tests {
203
212
204
213
#[ tokio:: test]
205
214
async fn test_set ( ) {
206
- let queue = InMemoryTaskQueue :: < TestData > :: new ( ) ;
215
+ let queue = InMemoryTaskQueue :: < TestData , JsonSerializer > :: new ( ) ;
207
216
let mut task = Task :: new ( TestData { value : 42 } ) ;
208
217
209
218
queue. push ( & task) . await . unwrap ( ) ;
@@ -218,7 +227,7 @@ mod tests {
218
227
219
228
#[ tokio:: test]
220
229
async fn test_multiple_tasks ( ) {
221
- let queue = InMemoryTaskQueue :: < TestData > :: new ( ) ;
230
+ let queue = InMemoryTaskQueue :: < TestData , JsonSerializer > :: new ( ) ;
222
231
let tasks = vec ! [
223
232
Task :: new( TestData { value: 1 } ) ,
224
233
Task :: new( TestData { value: 2 } ) ,
@@ -240,7 +249,7 @@ mod tests {
240
249
241
250
#[ tokio:: test]
242
251
async fn test_task_not_found ( ) {
243
- let queue = InMemoryTaskQueue :: < TestData > :: new ( ) ;
252
+ let queue = InMemoryTaskQueue :: < TestData , JsonSerializer > :: new ( ) ;
244
253
let non_existent_task_id = TaskId :: new ( ) ;
245
254
246
255
match queue. ack ( & non_existent_task_id) . await {
0 commit comments