Server events with Plug + GenEvent.stream

Hello,

I’m trying to implement a server events endpoint in my application using GenEvent.stream. I have something very simple that half works. It look like this:

  get "/events/" do
    conn = put_resp_header(conn, "content-type", "text/event-stream")                                                        
    conn = send_chunked(conn, 200)
    stream = GenEvent.stream(MyGenEvent)
    for e <- stream do
      chunk(conn, "event: \"message\"\n\ndata: \"#{e}\"\n\n")
    end
    conn
  end

So when I point my browser to that url it works, I can see all events comming up.

Now the issue is when the browser stop the polling. If I look at the GenEvent process in the observer I can see that the stream is still registered in the handlers.

I understand that the process consuming the stream has to be shutdown so that the handler is removed from GenEvent. The problem is that I can’t see how to do this in the Plug context since HTTP is stateless.

I would look into phoenix channels if you want a long running connection to push data out to clients. There are a lot of complexities here, and the phoenix team has done incredible work with channels.

Finally got my POC working !

  defp send_event(conn, e) do                                                                               
    chunk(conn, "event: \"message\"\n\ndata: \"#{e}\"\n\n") 
  end                                                                                                                  

  defp consume_stream(conn) do                                                                                         
    GenEvent.stream(MyGenEvent)                                                                                                 
    |> Enum.reduce_while(%Plug.Conn{}, fn e, acc ->                                                                    
      case send_event(conn, e) do                                                                                      
        {:ok, conn} ->                                                                                                 
          {:cont, conn}                                                                                                
        {:error, :closed} ->                                                                                           
          {:halt, acc}                                                                                                 
      end                                                                                                              
    end)                                                                                                               
  end                                                                                                                  
                                                                                                                       
  defp start_stream(conn) do                                                                                           
    Task.async(fn -> consume_stream(conn) end)                                                                         
  end                                                                                                                  
                                                                                                                       
  defp wait_stream(t) do                                                                                               
    case Task.yield(t, 1000) do                                                                                        
      nil ->                                                                                                           
        wait_stream(t)                                                                                                 
      {:ok, conn} ->                                                                                                   
        conn                                                                                                           
    end                                                                                                                
  end                                                                                                                  
                                                                                                                       
  get "/graph/events/" do                                                                                              
    conn                                                                                                               
    |> put_resp_header("content-type", "text/event-stream")                                                            
    |> send_chunked(200)                                                                                               
    |> start_stream                                                                                                    
    |> wait_stream                                                                                                     
  end                                                                                                                  

I start the stream in a task and then check it every second with Task.yield to see if it has returned.
The task is reducing the stream and keeping the last conn in its state. When the connection is closed the task returns and so is wait_stream with the correct conn.