@@ -17,7 +17,7 @@ use std::time::Instant;
17
17
18
18
use hyper:: StatusCode ;
19
19
use quickwit_common:: rate_limited_error;
20
- use quickwit_config:: INGEST_V2_SOURCE_ID ;
20
+ use quickwit_config:: { validate_identifier , INGEST_V2_SOURCE_ID } ;
21
21
use quickwit_ingest:: IngestRequestV2Builder ;
22
22
use quickwit_proto:: ingest:: router:: {
23
23
IngestFailureReason , IngestResponseV2 , IngestRouterService , IngestRouterServiceClient ,
@@ -91,6 +91,7 @@ pub(crate) async fn elastic_bulk_ingest_v2(
91
91
let mut lines = lines ( & body. content ) . enumerate ( ) ;
92
92
let mut per_subrequest_doc_handles: HashMap < u32 , Vec < DocHandle > > = HashMap :: new ( ) ;
93
93
let mut action_count = 0 ;
94
+ let mut invalid_index_id_items = Vec :: new ( ) ;
94
95
while let Some ( ( line_no, line) ) = lines. next ( ) {
95
96
let action = serde_json:: from_slice :: < BulkAction > ( line) . map_err ( |error| {
96
97
ElasticsearchError :: new (
@@ -121,6 +122,16 @@ pub(crate) async fn elastic_bulk_ingest_v2(
121
122
Some ( ElasticException :: ActionRequestValidation ) ,
122
123
)
123
124
} ) ?;
125
+
126
+ // Validate index id early because propagating back the right error (400)
127
+ // from deeper ingest layers is harder
128
+ if validate_identifier ( "" , & index_id) . is_err ( ) {
129
+ let invalid_item = make_invalid_index_id_item ( index_id. clone ( ) , meta. es_doc_id ) ;
130
+ invalid_index_id_items. push ( ( action_count, invalid_item) ) ;
131
+ action_count += 1 ;
132
+ continue ;
133
+ }
134
+
124
135
let ( subrequest_id, doc_uid) = ingest_request_builder. add_doc ( index_id, doc) ;
125
136
126
137
let doc_handle = DocHandle {
@@ -151,6 +162,7 @@ pub(crate) async fn elastic_bulk_ingest_v2(
151
162
per_subrequest_doc_handles,
152
163
now,
153
164
action_count,
165
+ invalid_index_id_items,
154
166
)
155
167
}
156
168
@@ -159,6 +171,7 @@ fn make_elastic_bulk_response_v2(
159
171
mut per_subrequest_doc_handles : HashMap < u32 , Vec < DocHandle > > ,
160
172
now : Instant ,
161
173
action_count : usize ,
174
+ invalid_index_id_items : Vec < ( usize , ElasticBulkItem ) > ,
162
175
) -> Result < ElasticBulkResponse , ElasticsearchError > {
163
176
let mut positioned_actions: Vec < ( usize , ElasticBulkAction ) > = Vec :: with_capacity ( action_count) ;
164
177
let mut errors = false ;
@@ -308,6 +321,12 @@ fn make_elastic_bulk_response_v2(
308
321
"doc handles should be empty"
309
322
) ;
310
323
324
+ for ( position, item) in invalid_index_id_items {
325
+ errors = true ;
326
+ let action = ElasticBulkAction :: Index ( item) ;
327
+ positioned_actions. push ( ( position, action) ) ;
328
+ }
329
+
311
330
assert_eq ! (
312
331
positioned_actions. len( ) ,
313
332
action_count,
@@ -344,6 +363,20 @@ fn remove_doc_handles(
344
363
} )
345
364
}
346
365
366
+ fn make_invalid_index_id_item ( index_id : String , es_doc_id : Option < String > ) -> ElasticBulkItem {
367
+ let error = ElasticBulkError {
368
+ index_id : Some ( index_id. clone ( ) ) ,
369
+ exception : ElasticException :: IllegalArgument ,
370
+ reason : format ! ( "invalid index id [{}]" , index_id) ,
371
+ } ;
372
+ ElasticBulkItem {
373
+ index_id,
374
+ es_doc_id,
375
+ status : StatusCode :: BAD_REQUEST ,
376
+ error : Some ( error) ,
377
+ }
378
+ }
379
+
347
380
#[ cfg( test) ]
348
381
mod tests {
349
382
use bytesize:: ByteSize ;
@@ -707,6 +740,7 @@ mod tests {
707
740
HashMap :: new ( ) ,
708
741
Instant :: now ( ) ,
709
742
0 ,
743
+ Vec :: new ( ) ,
710
744
)
711
745
. unwrap ( ) ;
712
746
@@ -767,6 +801,7 @@ mod tests {
767
801
per_request_doc_handles,
768
802
Instant :: now ( ) ,
769
803
3 ,
804
+ Vec :: new ( ) ,
770
805
)
771
806
. unwrap ( ) ;
772
807
@@ -832,4 +867,82 @@ mod tests {
832
867
. reply ( & handler)
833
868
. await ;
834
869
}
870
+
871
+ #[ tokio:: test]
872
+ async fn test_bulk_api_invalid_index_id ( ) {
873
+ let mut mock_ingest_router = MockIngestRouterService :: new ( ) ;
874
+ mock_ingest_router
875
+ . expect_ingest ( )
876
+ . once ( )
877
+ . returning ( |ingest_request| {
878
+ assert_eq ! ( ingest_request. subrequests. len( ) , 2 ) ;
879
+ Ok ( IngestResponseV2 {
880
+ successes : vec ! [
881
+ IngestSuccess {
882
+ subrequest_id: 0 ,
883
+ index_uid: Some ( IndexUid :: for_test( "my-index-1" , 0 ) ) ,
884
+ source_id: INGEST_V2_SOURCE_ID . to_string( ) ,
885
+ shard_id: Some ( ShardId :: from( 1 ) ) ,
886
+ replication_position_inclusive: Some ( Position :: offset( 1u64 ) ) ,
887
+ num_ingested_docs: 2 ,
888
+ parse_failures: Vec :: new( ) ,
889
+ } ,
890
+ IngestSuccess {
891
+ subrequest_id: 1 ,
892
+ index_uid: Some ( IndexUid :: for_test( "my-index-2" , 0 ) ) ,
893
+ source_id: INGEST_V2_SOURCE_ID . to_string( ) ,
894
+ shard_id: Some ( ShardId :: from( 1 ) ) ,
895
+ replication_position_inclusive: Some ( Position :: offset( 0u64 ) ) ,
896
+ num_ingested_docs: 1 ,
897
+ parse_failures: Vec :: new( ) ,
898
+ } ,
899
+ ] ,
900
+ failures : Vec :: new ( ) ,
901
+ } )
902
+ } ) ;
903
+ let ingest_router = IngestRouterServiceClient :: from_mock ( mock_ingest_router) ;
904
+ let handler = es_compat_bulk_handler_v2 ( ingest_router, ByteSize :: mb ( 10 ) ) ;
905
+
906
+ let payload = r#"
907
+ {"create": {"_index": "my-index-1"}}
908
+ {"ts": 1, "message": "my-message-1"}
909
+ {"create": {"_index": "bad!"}}
910
+ {"ts": 1, "message": "my-message-2"}
911
+ {"create": {"_index": "my-index-2", "_id" : "1"}}
912
+ {"ts": 1, "message": "my-message-3"}
913
+
914
+ "# ;
915
+ let response = warp:: test:: request ( )
916
+ . path ( "/_elastic/_bulk" )
917
+ . method ( "POST" )
918
+ . body ( payload)
919
+ . reply ( & handler)
920
+ . await ;
921
+ assert_eq ! ( response. status( ) , 200 ) ;
922
+
923
+ let bulk_response: ElasticBulkResponse = serde_json:: from_slice ( response. body ( ) ) . unwrap ( ) ;
924
+ assert ! ( bulk_response. errors) ;
925
+
926
+ let items = bulk_response
927
+ . actions
928
+ . into_iter ( )
929
+ . map ( |action| match action {
930
+ ElasticBulkAction :: Create ( item) => item,
931
+ ElasticBulkAction :: Index ( item) => item,
932
+ } )
933
+ . collect :: < Vec < _ > > ( ) ;
934
+ assert_eq ! ( items. len( ) , 3 ) ;
935
+
936
+ assert_eq ! ( items[ 0 ] . index_id, "my-index-1" ) ;
937
+ assert ! ( items[ 0 ] . es_doc_id. is_none( ) ) ;
938
+ assert_eq ! ( items[ 0 ] . status, StatusCode :: CREATED ) ;
939
+
940
+ assert_eq ! ( items[ 1 ] . index_id, "bad!" ) ;
941
+ assert ! ( items[ 1 ] . es_doc_id. is_none( ) ) ;
942
+ assert_eq ! ( items[ 1 ] . status, StatusCode :: BAD_REQUEST ) ;
943
+
944
+ assert_eq ! ( items[ 2 ] . index_id, "my-index-2" ) ;
945
+ assert_eq ! ( items[ 2 ] . es_doc_id. as_ref( ) . unwrap( ) , "1" ) ;
946
+ assert_eq ! ( items[ 2 ] . status, StatusCode :: CREATED ) ;
947
+ }
835
948
}
0 commit comments