Hello,
I’m trying to implement a server events endpoint in my application using GenEvent.stream. I have something very simple that half works. It look like this:
get "/events/" do
conn = put_resp_header(conn, "content-type", "text/event-stream")
conn = send_chunked(conn, 200)
stream = GenEvent.stream(MyGenEvent)
for e <- stream do
chunk(conn, "event: \"message\"\n\ndata: \"#{e}\"\n\n")
end
conn
end
So when I point my browser to that url it works, I can see all events comming up.
Now the issue is when the browser stop the polling. If I look at the GenEvent process in the observer I can see that the stream is still registered in the handlers.
I understand that the process consuming the stream has to be shutdown so that the handler is removed from GenEvent. The problem is that I can’t see how to do this in the Plug context since HTTP is stateless.
I would look into phoenix channels if you want a long running connection to push data out to clients. There are a lot of complexities here, and the phoenix team has done incredible work with channels.
Finally got my POC working !
defp send_event(conn, e) do
chunk(conn, "event: \"message\"\n\ndata: \"#{e}\"\n\n")
end
defp consume_stream(conn) do
GenEvent.stream(MyGenEvent)
|> Enum.reduce_while(%Plug.Conn{}, fn e, acc ->
case send_event(conn, e) do
{:ok, conn} ->
{:cont, conn}
{:error, :closed} ->
{:halt, acc}
end
end)
end
defp start_stream(conn) do
Task.async(fn -> consume_stream(conn) end)
end
defp wait_stream(t) do
case Task.yield(t, 1000) do
nil ->
wait_stream(t)
{:ok, conn} ->
conn
end
end
get "/graph/events/" do
conn
|> put_resp_header("content-type", "text/event-stream")
|> send_chunked(200)
|> start_stream
|> wait_stream
end
I start the stream in a task and then check it every second with Task.yield to see if it has returned.
The task is reducing the stream and keeping the last conn in its state. When the connection is closed the task returns and so is wait_stream with the correct conn.