How to stream data from callback-based API?

I’m using WebSockex and Finch for a streaming client, and I want to emit every incoming message using an Elixir Stream, but how do I do that?

I imagine my code would work like this:

MyModule.new_client("wss://cool.feed.com/")
|> Stream.each(&IO.inspect/1)

I gather that new_client/1 needs to return a %Stream{} instance. It looks like Stream.resource/3 is the function to do this, but getting the next_fun to work with my client modules sounds complicated. Am I approaching this correctly? It seems I need my WebSockex callbacks to push every new message into a :queue, and my next_fun needs to call my WebSockex process to retrieve (and remove) the oldest message from the queue. If next_fun is called and the queue is empty (the next message hasn’t arrived yet), how do I complete this synchronous function call only after the next message has arrived?

It looks like the solution is for next_fun to invoke GenServer.call/3 to get the oldest unhandled message from my WebSockex process, and – when the next message hasn’t yet arrived – to return {:noreply, new_state} from the handle_call/3 callback, and then call reply/2 once it has.

handle_call/3 takes a timeout value. Will my stream close if the call times out?

It looks like the solution is for next_fun to invoke GenServer.call/3 to get the oldest unhandled message from my WebSockex process, and – when the next message hasn’t yet arrived – to return {:noreply, new_state} from the handle_call/3 callback, and then call reply/2 once it has.

handle_call/3 takes a timeout value. Will my stream close if the call times out?

UPDATE: I had to add a GenServer for this to work. Originally I thought WebSockex implemented everything from GenServer, and that I could do all of this in my existing WebSockex server. But actually, WebSockex doesn’t implement GenServer.call/2, and so instead I had to add a GenServer to serve as a buffer, which I suppose is the correct architecture for this.

2 Likes

@rm-rf-etc , do you think you could provide some sample code on how you glued everything together?