Hello,
I have grpc
stream that is streaming price_value updates for specific symbol (e.g. EUR).
I have implemented one solution using poolboy
where I spawn worker for each symbol and listen to stream inside the worker.
Now, the more I read and research I feel this is not the good way.
Is the genstage
answer for this? I was thinking that for each symbol I can spin up producer that will listen to stream and then emit events to consumers which will eventually feed the data into mqtt.
Welcome to the forum Admir ! We hope you find this place helpful and friendly.
If you are considering GenStage, perhaps you should take a look at Flow, which was created to deal with streams in a concurrent / parallel fashion and is built on top of GenStage:
Thank you very much for warm welcome!
I considered flow
but I do not really need any data processing, maybe I am missing on use case of flow.
I am more interested is GenServer
good enough to do something like this:
def handle_cast({:subscribe, chan}, [symbol, _] = state) do
case Source.get_stream_bid_ask_update(chan, [symbol]) do
{:ok, stream} -> Enum.each(
stream,
fn msg ->
handle_stream msg
end
)
{:error, err} ->
crash(self(), err)
end
{:noreply, state}
end
handle_stream/1
is publishing message to another GenServer which is sending values to mqtt