Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The client does not report SSE stream being closed by server #13

Open
sukhmel opened this issue Apr 7, 2023 · 1 comment
Open

The client does not report SSE stream being closed by server #13

sukhmel opened this issue Apr 7, 2023 · 1 comment

Comments

@sukhmel
Copy link

sukhmel commented Apr 7, 2023

If the server closes a connection, the EventSource::state reports state is Open, and does not receive any error.

I have tried the Chrome developer console with the same server, and it reports an error and reopens the connection, which seems to be standard behaviour.

here's the minimal example I've used to test sse-client, closing connection upon finishing the stream is implied from axum inner works and confirmed by the maintainer here

use std::{env, sync::{Arc, atomic::AtomicI32}, time::Duration, convert::Infallible};                                                                                                                                                       
                                                                                                                                                                                                      
use axum::{Router, routing::get, response::sse::{Event, KeepAlive, Sse}};                                                                                                                                                                                                                                                      
use tokio;                                                                                                                                                                                            
use tracing::{info, warn};                                                                                                                                                                            
use futures_util::stream::Stream;                                                                                                                                                                     
use sse_client::EventSource;                                                                                                                                                                          
use tower_http::cors::CorsLayer;                                                                                                                                                                      
                                                                                                                                                                                                      
/// If a number is given as a parameter to the program, creates a client and expects this amount of messages to be received                                                                                                                                                                                                      
#[tokio::main]                                                                                                                                                                                        
async fn main() {                                                                                                                                                                                     
  tracing_subscriber::fmt().init();                                                                                                                                                                   
  let args: Vec<String> = env::args().collect();                                                                                                                                                      
  if args.len() > 1 {                                                                                                                                                                                 
      let amount = args[1].parse().unwrap();                                                                                                                                                          
      listen_sse(amount).await;                                                                                                                                                                       
  } else {                       
      // create a server for serving SSE connections                                                                                                                                                                     
      let app = Router::new().route("/sse", get(sse_handler)).layer(CorsLayer::permissive());                                                                                                         
      axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())                                                                                                                                            
          .serve(app.into_make_service())                                                                                                                                                             
          .await                                                                                                                                                                                      
          .unwrap();                                                                                                                                                                                  
  }                                        
}                                          

/// Only send 2 events for each connection, then close the connection                                           
async fn sse_handler() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {                                                                                                                                                                                     
    // A `Stream` that repeats an event every second                                                                                                                                                  
    let stream = async_stream::stream! {                                                                                                                                                              
        let mut interval = tokio::time::interval(Duration::from_secs(1));                                                                                                                             
        for counter in 1..3 {                                                                                                                                                                        
            interval.tick().await;                                                                                                                                                                    
            yield Ok(Event::default().data(format!("count = {counter}")));                                                                                                                            
        }                                                                                                                                                                                                                                                                                                                                                
    };                                                                                                                                                                                                
                                                                                                                                                                                                      
    Sse::new(stream).keep_alive(KeepAlive::default())                                                                                                                                                 
}

/// Create a connection to SSE server, listen for a specified amount of events then close
async fn listen_sse(amount: i32) {                                                                                                                                                                  
    let event_source = EventSource::new("http://localhost:3000/sse").unwrap();                                                                                                                      
                                                             
    // This listener never gets executed                                                                                                                                       
    event_source.add_event_listener("error", |error| warn!("Error {:?}", error));                             
                                    
    let counter = Arc::new(AtomicI32::new(1));
    let counter_clone = counter.clone();
                                    
    event_source.on_message(move |message| {
        info!("New message {:?}", message);
        counter.fetch_add(1, std::sync::atomic::Ordering::Release);
    });

    loop {
        tokio::time::sleep(Duration::from_millis(500)).await;
        //This keeps printing `stream state is Open` after the server has closed the connection
        info!("stream state is {:?}", event_source.state());
        if counter_clone.load(std::sync::atomic::Ordering::Acquire) > amount {
            break;
        }
    }

    event_source.close();
}

here's the Chrome console counterpart for the client side, I had to disable CSP to use this code, for that I've used this plugin:

const evtSource = new EventSource("http://localhost:3000/sse", { withCredentials: false });
evtSource.onmessage = (event) => { console.log(event); }
evtSource.onerror = (err) => { console.error("EventSource failed:", err); };

here's an example output for a version with more debug logs, using the sse-client:

2023-04-07T11:42:03.572910Z  INFO SERVER: axum_sse_playground: stream started
2023-04-07T11:42:03.574078Z  INFO SERVER: axum_sse_playground: sending "count = 1"
2023-04-07T11:42:03.574240Z  INFO client: axum_sse_playground: New message Event { id: "", type_: "message", data: "count = 1" }
2023-04-07T11:42:04.072047Z  INFO client: axum_sse_playground: stream state is Open
2023-04-07T11:42:04.574543Z  INFO client: axum_sse_playground: stream state is Open
2023-04-07T11:42:04.574581Z  INFO SERVER: axum_sse_playground: sending "count = 2"
2023-04-07T11:42:04.574865Z  INFO SERVER: axum_sse_playground: stream closed
2023-04-07T11:42:04.575359Z  INFO client: axum_sse_playground: New message Event { id: "", type_: "message", data: "count = 2" }
2023-04-07T11:42:05.076061Z  INFO client: axum_sse_playground: stream state is Open
2023-04-07T11:42:05.577693Z  INFO client: axum_sse_playground: stream state is Open
2023-04-07T11:42:06.079126Z  INFO client: axum_sse_playground: stream state is Open
2023-04-07T11:42:06.580785Z  INFO client: axum_sse_playground: stream state is Open
2023-04-07T11:42:07.081448Z  INFO client: axum_sse_playground: stream state is Open

using Chrome client:

2023-04-07T11:42:33.721796Z  INFO SERVER: axum_sse_playground: stream started
2023-04-07T11:42:33.723057Z  INFO SERVER: axum_sse_playground: sending "count = 1"
2023-04-07T11:42:34.723521Z  INFO SERVER: axum_sse_playground: sending "count = 2"
2023-04-07T11:42:34.723711Z  INFO SERVER: axum_sse_playground: stream closed
2023-04-07T11:42:37.736963Z  INFO SERVER: axum_sse_playground: stream started
2023-04-07T11:42:37.738235Z  INFO SERVER: axum_sse_playground: sending "count = 1"
2023-04-07T11:42:38.739625Z  INFO SERVER: axum_sse_playground: sending "count = 2"
2023-04-07T11:42:38.739805Z  INFO SERVER: axum_sse_playground: stream closed
2023-04-07T11:42:41.751556Z  INFO SERVER: axum_sse_playground: stream started
2023-04-07T11:42:41.752770Z  INFO SERVER: axum_sse_playground: sending "count = 1"
2023-04-07T11:42:42.752871Z  INFO SERVER: axum_sse_playground: sending "count = 2"
2023-04-07T11:42:42.752942Z  INFO SERVER: axum_sse_playground: stream closed

in browser:

MessageEvent {isTrusted: true, data: 'count = 1', origin: 'http://localhost:3000', lastEventId: '', source: null, …}
MessageEvent {isTrusted: true, data: 'count = 2', origin: 'http://localhost:3000', lastEventId: '', source: null, …}
EventSource failed: Event {isTrusted: true, type: 'error', target: EventSource, currentTarget: EventSource, eventPhase: 2, …}
MessageEvent {isTrusted: true, data: 'count = 1', origin: 'http://localhost:3000', lastEventId: '', source: null, …}
MessageEvent {isTrusted: true, data: 'count = 2', origin: 'http://localhost:3000', lastEventId: '', source: null, …}
EventSource failed: Event {isTrusted: true, type: 'error', target: EventSource, currentTarget: EventSource, eventPhase: 2, …}
MessageEvent {isTrusted: true, data: 'count = 1', origin: 'http://localhost:3000', lastEventId: '', source: null, …}
MessageEvent {isTrusted: true, data: 'count = 2', origin: 'http://localhost:3000', lastEventId: '', source: null, …}
EventSource failed: Event {isTrusted: true, type: 'error', target: EventSource, currentTarget: EventSource, eventPhase: 2, …}
@viniciusgerevini
Copy link
Owner

Hello @sukhmel . Thanks for reporting this and for the very detailed example. I'll look into this later this week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants