1+ #![ cfg_attr( coverage, coverage( off) ) ]
2+
13use std:: future:: Future ;
2- use std:: sync:: LazyLock ;
4+ use std:: sync:: { Arc , LazyLock } ;
35use std:: time:: Duration ;
46
57use const_format:: formatcp;
68use log:: { error, info, warn} ;
9+ use rocket:: futures:: future:: join_all;
710use rocket:: futures:: stream:: FuturesUnordered ;
811use rocket:: futures:: StreamExt ;
912use rocket:: http:: ContentType ;
@@ -232,6 +235,29 @@ fn api_login_fails() {
232235 } ) ;
233236}
234237
238+ #[ test]
239+ fn api_login_and_logout ( ) {
240+ shared_rt_test ( async {
241+ let client = RocketClient :: untracked ( build_rocket ( program_config ( ) ) ) . await . unwrap ( ) ;
242+
243+ let resp = client
244+ . post ( "/api/login" )
245+ . header ( ContentType :: JSON )
246+ . body ( r#"{"username": "admin", "password": "admin"}"# )
247+ . dispatch ( )
248+ . await ;
249+ assert_eq ! ( resp. status( ) , Status :: Ok ) ;
250+ let session_id = resp. into_json :: < LoginResponse > ( ) . await . unwrap ( ) . session_id ;
251+
252+ let resp = client
253+ . post ( "/api/logout" )
254+ . header ( rocket:: http:: Header :: new ( "Authorization" , session_id) )
255+ . dispatch ( )
256+ . await ;
257+ assert_eq ! ( resp. status( ) , Status :: Ok ) ;
258+ } ) ;
259+ }
260+
235261// NOTE: This test works only if the user used here is not shared with other tests!
236262#[ test]
237263fn api_login_max_sessions_exceeds ( ) {
@@ -290,6 +316,19 @@ fn api_rpc_calls() {
290316 }
291317 }
292318
319+ // Wrong session token
320+ {
321+ let resp = client
322+ . post ( "/api/rpc" )
323+ . header ( rocket:: http:: Header :: new ( "Authorization" , "wrong_token" ) )
324+ . header ( ContentType :: JSON )
325+ . body ( r#"{"path": ".broker", "method": "anything"}"# )
326+ . dispatch ( )
327+ . await ;
328+ assert_eq ! ( resp. status( ) , Status :: Unauthorized ) ;
329+ }
330+
331+ // Wrong body format
293332 {
294333 let resp = client
295334 . post ( "/api/rpc" )
@@ -300,6 +339,8 @@ fn api_rpc_calls() {
300339 . await ;
301340 assert_eq ! ( resp. status( ) , Status :: UnprocessableEntity ) ;
302341 }
342+
343+ // Regular calls
303344 let rpc_call_dispatcher = RpcCallDispatcher { client, session_id } ;
304345 {
305346 let resp = rpc_call_dispatcher. call ( ".broker" , "ls" , None :: < & str > ) . await ;
@@ -324,7 +365,7 @@ fn api_rpc_calls() {
324365#[ test]
325366fn api_subscribe ( ) {
326367 shared_rt_test ( async {
327- let client = RocketClient :: untracked ( build_rocket ( program_config ( ) ) ) . await . unwrap ( ) ;
368+ let client = Arc :: new ( RocketClient :: untracked ( build_rocket ( program_config ( ) ) ) . await . unwrap ( ) ) ;
328369
329370 let resp = client
330371 . post ( "/api/login" )
@@ -334,37 +375,45 @@ fn api_subscribe() {
334375 . await ;
335376 let session_id = resp. into_json :: < LoginResponse > ( ) . await . unwrap ( ) . session_id ;
336377
337- let resp = client
338- . post ( "/api/subscribe" )
339- . header ( ContentType :: JSON )
340- . header ( rocket:: http:: Header :: new ( "Authorization" , session_id. clone ( ) ) )
341- . body ( r#"{"shv_ri": "test/device/value:*:*"}"# )
342- . dispatch ( )
343- . await ;
378+ let mut tasks = vec ! [ ] ;
379+ for task_id in 0 ..10 {
380+ let client = client. clone ( ) ;
381+ let session_id = session_id. clone ( ) ;
382+ let task = tokio:: spawn ( async move {
383+ let req = client
384+ . post ( "/api/subscribe" )
385+ . header ( ContentType :: JSON )
386+ . header ( rocket:: http:: Header :: new ( "Authorization" , session_id) )
387+ . body ( format ! ( r#"{{"shv_ri": "{}:*:*"}}"# ,
388+ if task_id < 5 { "test/device/value" } else { "test/*" }
389+ ) )
390+ . dispatch ( ) ;
391+ let resp = req. await ;
392+
393+ assert ! ( resp. content_type( ) . unwrap( ) . is_event_stream( ) ) ;
344394
345- assert ! ( resp. content_type( ) . unwrap( ) . is_event_stream( ) ) ;
346-
347- // let mut reader = tokio::io::BufReader::new(resp).lines();
348- // for i in 0..5 {
349- // warn!("receiving event {i}");
350- // let event = reader
351- // .next_line()
352- // .await
353- // .expect("Read line error")
354- // .expect("Unexpected end of stream");
355- // warn!("{event}");
356- // }
357-
358- let mut reader = sse_codec:: decode_stream ( tokio:: io:: BufReader :: new ( resp) . compat ( ) ) ;
359- for i in 0 ..10 {
360- info ! ( "receiving event {i}" ) ;
361- let event = reader
362- . next ( )
363- . await
364- . expect ( "Unexpected end of stream" )
365- . unwrap_or_else ( |e| panic ! ( "Unexpected error in event stream: {e}" ) ) ;
366- match event {
367- sse_codec:: Event :: Message { id, event, data, ..} => {
395+ // let mut reader = tokio::io::BufReader::new(resp).lines();
396+ // for i in 0..5 {
397+ // warn!("receiving event {i}");
398+ // let event = reader
399+ // .next_line()
400+ // .await
401+ // .expect("Read line error")
402+ // .expect("Unexpected end of stream");
403+ // warn!("{event}");
404+ // }
405+
406+ let mut reader = sse_codec:: decode_stream ( tokio:: io:: BufReader :: new ( resp) . compat ( ) ) ;
407+ for i in 0 ..10 {
408+ info ! ( "task {task_id}, receiving event {i}" ) ;
409+ let event = reader
410+ . next ( )
411+ . await
412+ . expect ( "Unexpected end of stream" )
413+ . unwrap_or_else ( |e| panic ! ( "Unexpected error in event stream: {e}" ) ) ;
414+ let sse_codec:: Event :: Message { id, event, data} = event else {
415+ panic ! ( "Unexpected event" ) ;
416+ } ;
368417 info ! ( "{data}" ) ;
369418 assert ! ( id. is_none( ) ) ;
370419 assert_eq ! ( event, "message" ) ;
@@ -373,8 +422,9 @@ fn api_subscribe() {
373422 assert_eq ! ( parsed_data. signal, Some ( "event" . into( ) ) ) ;
374423 assert_eq ! ( parsed_data. param, Some ( RpcValue :: from( 42 ) . to_cpon( ) ) ) ;
375424 }
376- _ => panic ! ( "Unexpected event" ) ,
377- }
425+ } ) ;
426+ tasks . push ( task ) ;
378427 }
428+ join_all ( tasks) . await ;
379429 } ) ;
380430}
0 commit comments