I’m using WebSockex
and Finch
for a streaming client, and I want to emit every incoming message using an Elixir Stream
, but how do I do that?
I imagine my code would work like this:
MyModule.new_client("wss://cool.feed.com/")
|> Stream.each(&IO.inspect/1)
I gather that new_client/1
needs to return a %Stream{}
instance. It looks like Stream.resource/3
is the function to do this, but getting the next_fun
to work with my client modules sounds complicated. Am I approaching this correctly? It seems I need my WebSockex
callbacks to push every new message into a :queue
, and my next_fun
needs to call my WebSockex
process to retrieve (and remove) the oldest message from the queue. If next_fun
is called and the queue is empty (the next message hasn’t arrived yet), how do I complete this synchronous function call only after the next message has arrived?
It looks like the solution is for next_fun
to invoke GenServer.call/3
to get the oldest unhandled message from my WebSockex
process, and – when the next message hasn’t yet arrived – to return {:noreply, new_state}
from the handle_call/3
callback, and then call reply/2
once it has.
handle_call/3
takes a timeout value. Will my stream close if the call times out?
It looks like the solution is for next_fun
to invoke GenServer.call/3
to get the oldest unhandled message from my WebSockex process, and – when the next message hasn’t yet arrived – to return {:noreply, new_state}
from the handle_call/3
callback, and then call reply/2
once it has.
handle_call/3
takes a timeout value. Will my stream close if the call times out?
UPDATE: I had to add a GenServer for this to work. Originally I thought WebSockex implemented everything from GenServer, and that I could do all of this in my existing WebSockex server. But actually, WebSockex doesn’t implement GenServer.call/2
, and so instead I had to add a GenServer to serve as a buffer, which I suppose is the correct architecture for this.
2 Likes
@rm-rf-etc , do you think you could provide some sample code on how you glued everything together?