How to stream data from callback-based API?

I’m using WebSockex and Finch for a streaming client, and I want to emit every incoming message using an Elixir Stream, but how do I do that?

I imagine my code would work like this:

MyModule.new_client("wss://cool.feed.com/")
|> Stream.each(&IO.inspect/1)

I gather that new_client/1 needs to return a %Stream{} instance. It looks like Stream.resource/3 is the function to do this, but getting the next_fun to work with my client modules sounds complicated. Am I approaching this correctly? It seems I need my WebSockex callbacks to push every new message into a :queue, and my next_fun needs to call my WebSockex process to retrieve (and remove) the oldest message from the queue. If next_fun is called and the queue is empty (the next message hasn’t arrived yet), how do I complete this synchronous function call only after the next message has arrived?

It looks like the solution is for next_fun to invoke GenServer.call/3 to get the oldest unhandled message from my WebSockex process, and – when the next message hasn’t yet arrived – to return {:noreply, new_state} from the handle_call/3 callback, and then call reply/2 once it has.

handle_call/3 takes a timeout value. Will my stream close if the call times out?

It looks like the solution is for next_fun to invoke GenServer.call/3 to get the oldest unhandled message from my WebSockex process, and – when the next message hasn’t yet arrived – to return {:noreply, new_state} from the handle_call/3 callback, and then call reply/2 once it has.

handle_call/3 takes a timeout value. Will my stream close if the call times out?

UPDATE: I had to add a GenServer for this to work. Originally I thought WebSockex implemented everything from GenServer, and that I could do all of this in my existing WebSockex server. But actually, WebSockex doesn’t implement GenServer.call/2, and so instead I had to add a GenServer to serve as a buffer, which I suppose is the correct architecture for this.

2 Likes

@rm-rf-etc , do you think you could provide some sample code on how you glued everything together?

Today, I was searching for the answer, too, but can’t find any. Inspired by `Req.request(…, into: :self)`, I did it in an extremely unclean way. Assume the stream will be consumed in the same process creating it.

event_ref = make_ref()
parent_pid = self()

{child_pid, child_ref} =
  spawn_monitor(fn ->
    SomeModule.callback_api(fn event ->
      send(parent_pid, {:my_event, event_ref, event})
    end)
  end)

stream = Stream.unfold(:whatever, fn _ ->
  receive do
    {:DOWN, ^child_ref, :process, ^child_pid, :normal} -> nil
    {:my_event, ^event_ref, event} -> {event, :whatever}
  end
end)