1
1
mod engines;
2
2
3
3
use std:: collections:: { BTreeMap , BTreeSet , HashSet } ;
4
+ use std:: hash:: { DefaultHasher , Hash , Hasher } ;
4
5
use std:: io:: { stdout, Read , Seek , SeekFrom , Write } ;
5
6
use std:: path:: { Path , PathBuf } ;
6
7
use std:: time:: { Duration , Instant } ;
@@ -19,7 +20,7 @@ use rand::seq::SliceRandom;
19
20
use sqllogictest:: substitution:: well_known;
20
21
use sqllogictest:: {
21
22
default_column_validator, default_normalizer, default_validator, update_record_with_output,
22
- AsyncDB , Injected , MakeConnection , Record , Runner ,
23
+ AsyncDB , Injected , MakeConnection , Partitioner , Record , Runner ,
23
24
} ;
24
25
use tokio_util:: task:: AbortOnDropHandle ;
25
26
@@ -32,6 +33,10 @@ pub enum Color {
32
33
Never ,
33
34
}
34
35
36
+ // Env keys for partitioning.
37
+ const PARTITION_ID_ENV_KEY : & str = "SLT_PARTITION_ID" ;
38
+ const PARTITION_COUNT_ENV_KEY : & str = "SLT_PARTITION_COUNT" ;
39
+
35
40
#[ derive( Parser , Debug , Clone ) ]
36
41
#[ clap( about, version, author) ]
37
42
struct Opt {
@@ -112,6 +117,18 @@ struct Opt {
112
117
/// The engine name is a label by default.
113
118
#[ clap( long = "label" ) ]
114
119
labels : Vec < String > ,
120
+
121
+ /// Partition ID for sharding the test files. When used with `partition_count`,
122
+ /// divides the test files into shards based on the hash of the file path.
123
+ ///
124
+ /// Useful for running tests in parallel across multiple CI jobs. Currently
125
+ /// automatically configured in Buildkite.
126
+ #[ clap( long, env = PARTITION_ID_ENV_KEY ) ]
127
+ partition_id : Option < u64 > ,
128
+
129
+ /// Total number of partitions for test sharding. More details in `partition_id`.
130
+ #[ clap( long, env = PARTITION_COUNT_ENV_KEY ) ]
131
+ partition_count : Option < u64 > ,
115
132
}
116
133
117
134
/// Connection configuration.
@@ -138,10 +155,62 @@ impl DBConfig {
138
155
}
139
156
}
140
157
158
+ struct HashPartitioner {
159
+ count : u64 ,
160
+ id : u64 ,
161
+ }
162
+
163
+ impl HashPartitioner {
164
+ fn new ( count : u64 , id : u64 ) -> Result < Self > {
165
+ if count == 0 {
166
+ bail ! ( "partition count must be greater than zero" ) ;
167
+ }
168
+ if id >= count {
169
+ bail ! ( "partition id (zero-based) must be less than count" ) ;
170
+ }
171
+ Ok ( Self { count, id } )
172
+ }
173
+ }
174
+
175
+ impl Partitioner for HashPartitioner {
176
+ fn matches ( & self , file_name : & str ) -> bool {
177
+ let mut hasher = DefaultHasher :: new ( ) ;
178
+ file_name. hash ( & mut hasher) ;
179
+ hasher. finish ( ) % self . count == self . id
180
+ }
181
+ }
182
+
183
+ #[ allow( clippy:: needless_return) ]
184
+ fn import_partition_config_from_ci ( ) {
185
+ if std:: env:: var_os ( PARTITION_ID_ENV_KEY ) . is_some ( )
186
+ || std:: env:: var_os ( PARTITION_COUNT_ENV_KEY ) . is_some ( )
187
+ {
188
+ // Ignore if already set.
189
+ return ;
190
+ }
191
+
192
+ // Buildkite
193
+ {
194
+ const ID : & str = "BUILDKITE_PARALLEL_JOB" ;
195
+ const COUNT : & str = "BUILDKITE_PARALLEL_JOB_COUNT" ;
196
+
197
+ if let ( Some ( id) , Some ( count) ) = ( std:: env:: var_os ( ID ) , std:: env:: var_os ( COUNT ) ) {
198
+ std:: env:: set_var ( PARTITION_ID_ENV_KEY , id) ;
199
+ std:: env:: set_var ( PARTITION_COUNT_ENV_KEY , count) ;
200
+ eprintln ! ( "Imported partition config from Buildkite." ) ;
201
+ return ;
202
+ }
203
+ }
204
+
205
+ // TODO: more CI providers
206
+ }
207
+
141
208
#[ tokio:: main]
142
209
pub async fn main ( ) -> Result < ( ) > {
143
210
tracing_subscriber:: fmt:: init ( ) ;
144
211
212
+ import_partition_config_from_ci ( ) ;
213
+
145
214
let cli = Opt :: command ( ) . disable_help_flag ( true ) . arg (
146
215
Arg :: new ( "help" )
147
216
. long ( "help" )
@@ -167,6 +236,8 @@ pub async fn main() -> Result<()> {
167
236
r#override,
168
237
format,
169
238
labels,
239
+ partition_count,
240
+ partition_id,
170
241
} = Opt :: from_arg_matches ( & matches)
171
242
. map_err ( |err| err. exit ( ) )
172
243
. unwrap ( ) ;
@@ -205,17 +276,34 @@ pub async fn main() -> Result<()> {
205
276
Color :: Auto => { }
206
277
}
207
278
279
+ let partitioner = if let Some ( count) = partition_count {
280
+ let id = partition_id. context ( "parallel job count is specified but job id is not" ) ?;
281
+ Some ( HashPartitioner :: new ( count, id) ?)
282
+ } else {
283
+ None
284
+ } ;
285
+
208
286
let glob_patterns = files;
209
- let mut files: Vec < PathBuf > = Vec :: new ( ) ;
210
- for glob_pattern in glob_patterns. into_iter ( ) {
211
- let pathbufs = glob:: glob ( & glob_pattern) . context ( "failed to read glob pattern" ) ?;
212
- for pathbuf in pathbufs. into_iter ( ) . try_collect :: < _ , Vec < _ > , _ > ( ) ? {
213
- files. push ( pathbuf)
287
+ let mut all_files = Vec :: new ( ) ;
288
+
289
+ for glob_pattern in glob_patterns {
290
+ let mut files: Vec < PathBuf > = glob:: glob ( & glob_pattern)
291
+ . context ( "failed to read glob pattern" ) ?
292
+ . try_collect ( ) ?;
293
+
294
+ // Test against partitioner only if there are multiple files matched, e.g., expanded from an `*`.
295
+ if files. len ( ) > 1 {
296
+ if let Some ( partitioner) = & partitioner {
297
+ let len = files. len ( ) ;
298
+ files. retain ( |path| partitioner. matches ( path. to_str ( ) . unwrap ( ) ) ) ;
299
+ let len_after = files. len ( ) ;
300
+ eprintln ! (
301
+ "Running {len_after} out of {len} test cases for glob pattern \" {glob_pattern}\" based on partitioning." ,
302
+ ) ;
303
+ }
214
304
}
215
- }
216
305
217
- if files. is_empty ( ) {
218
- bail ! ( "no test case found" ) ;
306
+ all_files. extend ( files) ;
219
307
}
220
308
221
309
let config = DBConfig {
@@ -227,7 +315,7 @@ pub async fn main() -> Result<()> {
227
315
} ;
228
316
229
317
if r#override || format {
230
- return update_test_files ( files , & engine, config, format) . await ;
318
+ return update_test_files ( all_files , & engine, config, format) . await ;
231
319
}
232
320
233
321
let mut report = Report :: new ( junit. clone ( ) . unwrap_or_else ( || "sqllogictest" . to_string ( ) ) ) ;
@@ -241,7 +329,7 @@ pub async fn main() -> Result<()> {
241
329
jobs,
242
330
keep_db_on_failure,
243
331
& mut test_suite,
244
- files ,
332
+ all_files ,
245
333
& engine,
246
334
config,
247
335
& labels,
@@ -252,7 +340,7 @@ pub async fn main() -> Result<()> {
252
340
} else {
253
341
run_serial (
254
342
& mut test_suite,
255
- files ,
343
+ all_files ,
256
344
& engine,
257
345
config,
258
346
& labels,
0 commit comments