Terminating server process sending chunked response in phoenix

I have the following phoenix controller.

defmodule MyApp.UserController do
  use MyApp.Web, :controller

  def updates(conn, %{"id" => id}) do
    %PublicApi.User{id: ^id} = _user ->
      conn = put_resp_header(conn, "content-type", "text/event-stream")
      conn = send_chunked(conn, 200)
      send_since(conn, id, 0)
    end
  end

  defp send_since(conn, user_id, highwater_mark) do
    query = EventStore.for_user(user_id, highwater_mark)
    latest = Repo.all(query)
    %{id: highwater_mark} = List.last(latest) || %{id: highwater_mark}
    conn = Enum.reduce(latest, conn, fn(event, conn) ->
      chunk(conn, "data: #{event.message |> Poison.encode!}\n\n")
    end)
    :timer.sleep(1_000)
    send_since(conn, user_id, highwater_mark)
  end
end

If the client disconnects then this process is not terminated. How do I ensure that the server process terminates when the client disconnects?

I have tried linking from the controller process to the socket in the conn object (see below) but have not been able to get it to work.

%{adapter: {Plug.Adapters.Cowboy.Conn, payload = cowboy_req}} = conn
socket = (cowboy_req |> elem(1))
some_pid = (cowboy_req |> elem(4))
Process.link(socket)
Process.link(some_pid)
4 Likes

chunk(conn, data)returns { :ok, conn } on success, otherwise it returns { :error, reason } … so a simple:

{ :ok, conn } = chunk(conn, "data: ..."\n\n")

check should suffice, as when it returns { :error, reason } (where “reason” would be :closed in your case) it will crash that process and stop the transfer. So easy! :slight_smile:

On that note, this:

Probably is not working as you expect. chunk returns an { :ok, conn } tuple, so conn is going end up being { :ok, conn } rather than just conn as it appears you are expecting it to. So that will also get fixed by putting the check there for success :slight_smile:

2 Likes

Thanks, that fixed it.

Is there anyway to check if a connection is alive without writing to it.

1 Like

Hmm. Not that I know of as the socket itself is buried deeper below in the framework. Would perhaps be easier to do this with websocket and Phoenix Channels tbh …

1 Like