Pubsub stream to cowboy connection not working

I’m trying to implement a radio server in Elixir

One process is always working and reading a file (mp3) and publish to topic “:radio”, currently for test purpose when it finishes it starts over

Each connection subscribes to topic “:radio”

I don’t understand how to send the chunks to all subscribed connections, the connection close after 2 or 3 chunks

defmodule Plugtest do
  import Plug.Conn

  def init(opts), do: opts

  def start() do
    Plug.Adapters.Cowboy.http(Plugtest, [])
    {:ok, _pid} = PubSub.start_link()
    spawn(fn -> stream_from_file("./song.mp3", 128) end)
  end

  def call(conn, _opts) do
    conn = conn
    |> send_chunked(200)
    |> put_resp_content_type("audio/mpeg")

    :ok = PubSub.subscribe(spawn(fn -> send_chunk_to_connection(conn) end), :radio)
#    File.stream!("./song.mp3", [], 128) |> Enum.into(conn) # test purpose only
  end

  defp send_chunk_to_connection(conn) do
    receive do
      {:radio_data, data} ->
        IO.inspect "* #{inspect self()} * [ #{inspect conn.owner} ] [ #{inspect data} ]"
#        Enum.into(data, conn) # not working TODO send chunk to connection
        {:ok, conn} = chunk(conn, data)
        send_chunk_to_connection(conn)
    end
  end

  defp stream_from_file(fpath, bytes) do
    File.stream!(fpath, [], bytes)
    |> Enum.each(fn chunk ->
      PubSub.publish(:radio, {:radio_data, chunk})
    end)
    stream_from_file(fpath, bytes)
  end

end

stacktrace :

[error] Process #PID<0.274.0> raised an exception
        ** (MatchError) no match of right hand side value: {:error, :closed}    
            (plugtest) lib/plugtest.ex:26: Plugtest.send_chunk_to_connection/1 

dependencies [{:plug, “~> 1.0”}, {:cowboy, “~> 1.0”}, {:pubsub, “~> 0.0.2”}]

1 Like

Your call/1 function needs to return conn, not :ok first of all, that might be why it is closing your stream early (because it will do it ‘soon’ after you return, thus when you try to send to it, then it will fail).

1 Like

Also, I do not think the headers are set up properly and the browser may close it after 5-30 seconds as well, may want to double-check those.

1 Like

about the headers
if you uncomment the line
File.stream!("./song.mp3", [], 128) |> Enum.into(conn)
you can see the browser (chrome) will play the mp3

1 Like

From streaming audio years ago I remember needing to do something unique to the headers to get chunked encoding working for audio, it was not the same as just sending the file… Sorry that I cannot remember what, was a long time ago…

2 Likes

How come
Enum.into(File.stream!("./song.mp3", [], 128), conn)
sends the file in chunks and the browser decode it ? can I find out the headers it sends ?

The state of the connection is always :chunked and keep-alive enabled

It was not a problem with the files that I had, but rather something that the later chunked responses in the headers that I needed to add, if that helps any? :slight_smile:

1 Like

Every little piece is helpful mate !
so your analogy is like you need to specify how many bytes your packet contains in the header so the client will know not to close

Now that you mention that I think it was something to do with data size, I had to ‘fake it to the browser’ about something to make it stay active.

EDIT: Honestly nowadays I’d probably just send it over websocket, I am done with the weird browser stuff and websockets let me control it however I want. ^.^

1 Like

haha
Well, I’ll search for a websocket solution in Elixir & update
if you know about any helpful resource please let me know
btw, this project will be open source

To go more broad, for your radio service do you want to play the radio stream via a webpage, or do you want to be able to open it in something like vlc?

1 Like

I want it to be open like exactly like vlc
but I would like to start with webpage because its easier

So you are building something like a miracast server then?

I was not familiar with miracast, I want to build a radio server with playlist in Elixir

Er, no clue why I said miracast, the heck was that old mp3/ogg streaming server… But one of those yeah. Could be cool.

i’ll try to describe my next steps:

In the server side I need to use cowboy’s websocket api
@behaviour :cowboy_websocket_handler
in the init i’ll start the PubSub and spawn a process that will stream_from_file

def init({tcp, http}, _req, _opts) do
    {:ok, _pid} = PubSub.start_link()
    spawn fn ->
        stream_from_file("./song.mp3", 128)
    end
    {:upgrade, :protocol, :cowboy_websocket}
 end

in the websocket_init idk what i need to do

def websocket_init(_TransportName, req, _opts)
    {:ok, req, :undefined_state }
  end

and in websocket_handle i’ll handle the “Play” button, on click in the client side i’ll need to subscribe the the topic :radio

def websocket_handle({:text, content}, req, state) do
  :ok = PubSub.subscribe(spawn(fn -> send_chunk_to_connection(conn) end), :radio)
  {:reply, {:text, reply}, req, state}
end

and my problem is in the client side (javacsript) how to decode the data so it will be played in the audio tag

websocket.onmessage = function(evt) { /* decode audio */ };

Having issues with cowboy.webscocket
in the process that subscribes to the :radio topic I call websocket_handle

  defp send_chunk_to_connection(req, state) do
    receive do
      {:radio_data, content} ->
        IO.inspect "[ #{inspect self()} ] [ #{inspect content} ]"
        websocket_handle({:radio_data, content}, req, state)
        send_chunk_to_connection(req, state)
    end
  end

currently I read a text file

  def websocket_handle({:radio_data, content}, req, state) do
    { :ok, message } = JSEX.encode(%{ radio: content })
    IO.puts "radio data #{message} [ #{inspect req} ]"
    { :reply, {:text, message}, req, state}
  end

and its not sending the data
in the client side I print all incoming messages although I see the log from IO.puts “radio data #{message} [ #{inspect req} ]”
websocket.onmessage = function(evt) { console.log(evt.data); };

"[ #PID<0.274.0> ] [ \"222222222\\n\" ]"
radio data {"radio":"222222222\n"} [ {:http_req, #Port<0.4694>, :ranch_tcp, :keepalive, #PID<0.271.0>, "GET", :"HTTP/1.1", {{127, 0, 0, 1}, 50831}, "localhost", :undefined, 8080, "/websocket", :undefined, "", :undefined, [], [{"host", "localhost:8080"}, {"connection", "Upgrade"}, {"pragma", "no-cache"}, {"cache-control", "no-cache"}, {"upgrade", "websocket"}, {"origin", "http://localhost:8080"}, {"sec-websocket-version", "13"}, {"user-agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.82 Safari/537.36"}, {"accept-encoding", "gzip, deflate, sdch"}, {"accept-language", "en-US,en;q=0.8,he;q=0.6"}, {"sec-websocket-key", "/PtoG/ZYCGEz6zS4a6Gr3w=="}, {"sec-websocket-extensions", "permessage-deflate; client_max_window_bits"}], [{"sec-websocket-extensions", [{"permessage-deflate", ["client_max_window_bits"]}]}, {"upgrade", ["websocket"]}, {"connection", ["upgrade"]}], :undefined, [websocket_version: 13, websocket_compress: false], :waiting, "", :undefined, false, :waiting, [], "", :undefined} ]

Have a github for this yet so we can see the code? :slight_smile:

not yet
still with trying to figure out how to handle WebSockets
this not-working-example is reading from a text file 10 bytes each time
and suppose to send to all connected websocket

defmodule WebsocketHandler do
  @behaviour :cowboy_websocket_handler

  @fpath "./test.txt"

  def init({_tcp, _http}, _req, _opts) do
    IO.puts "init({tcp, http}, _req, _opts)"
    {:ok, _pid} = PubSub.start_link()
    spawn fn ->
      stream_from_file(@fpath, 10)
    end
    {:upgrade, :protocol, :cowboy_websocket}
  end

  def websocket_init(_TransportName, req, _opts) do
    IO.puts "init.  Starting timer. PID is #{inspect(self())}"
    :erlang.start_timer(1000, self(), [])
    :ok = PubSub.subscribe(spawn(fn -> loop(req) end), :radio)
    {:ok, req, :undefined_state }
  end

  def websocket_handle({:text, content}, req, state) do
    { :ok, %{ "message" => message} } = JSEX.decode(content)
    { :ok, reply } = JSEX.encode(%{ reply: String.reverse(message)})
    IO.puts("Message: #{message} [ #{state} ]")
    {:reply, {:text, reply}, req, state}
  end
  
  def websocket_handle(_data, req, state) do
    IO.puts "websocket_handle(_data, req, state)"
    {:ok, req, state}
  end

  def websocket_info({:info, info}, req, state) do
    IO.puts "websocket_info({:info, info}, req, state) #{info}"
    { :ok, message } = JSEX.encode(%{ radio: info})
    { :reply, {:text, message}, req, state}
  end

  def websocket_info(_info, req, state) do
    IO.puts "websocket_info(_info, req, state)"
    :erlang.start_timer(1000, self(), [])
    { :ok, message } = JSEX.encode(%{ radio: "yo"}) # TODO send lines here
    { :reply, {:text, message}, req, state}
  end

  def websocket_terminate(_reason, _req, _state) do
    :ok
  end

  defp loop(req) do
    receive do
      {:radio_data, data} ->
        IO.puts "received [ #{data} ]"
        websocket_info({:info, data}, req, :undefined_state)
        loop(req)
    end
  end

  defp stream_from_file(fpath, bytes) do
    File.stream!(fpath, [], bytes)
    |> Enum.each(fn chunk ->
      PubSub.publish(:radio, {:radio_data, chunk})
      :timer.sleep(1_000)
    end)
    stream_from_file(fpath, bytes)
  end

end

explicit invoking websocket_info in loop function doesn’t work although the log written