"The best" way to listen to grpc stream and process it


I have grpc stream that is streaming price_value updates for specific symbol (e.g. EUR).

I have implemented one solution using poolboy where I spawn worker for each symbol and listen to stream inside the worker.

Now, the more I read and research I feel this is not the good way.

Is the genstage answer for this? I was thinking that for each symbol I can spin up producer that will listen to stream and then emit events to consumers which will eventually feed the data into mqtt.

Welcome to the forum Admir ! We hope you find this place helpful and friendly.

If you are considering GenStage, perhaps you should take a look at Flow, which was created to deal with streams in a concurrent / parallel fashion and is built on top of GenStage:

Thank you very much for warm welcome!

I considered flow but I do not really need any data processing, maybe I am missing on use case of flow.

I am more interested is GenServer good enough to do something like this:

 def handle_cast({:subscribe, chan}, [symbol, _] = state) do
    case Source.get_stream_bid_ask_update(chan, [symbol]) do
      {:ok, stream} -> Enum.each(
                         fn msg ->
                           handle_stream msg
      {:error, err} ->
        crash(self(), err)

    {:noreply, state}


handle_stream/1 is publishing message to another GenServer which is sending values to mqtt