"The best" way to listen to grpc stream and process it


I have grpc stream that is streaming price_value updates for specific symbol (e.g. EUR).

I have implemented one solution using poolboy where I spawn worker for each symbol and listen to stream inside the worker.

Now, the more I read and research I feel this is not the good way.

Is the genstage answer for this? I was thinking that for each symbol I can spin up producer that will listen to stream and then emit events to consumers which will eventually feed the data into mqtt.

If you are considering GenStage, perhaps you should take a look at Flow, which was created to deal with streams in a concurrent / parallel fashion and is built on top of GenStage:

Thank you very much for warm welcome!

I considered flow but I do not really need any data processing, maybe I am missing on use case of flow.

I am more interested is GenServer good enough to do something like this:

 def handle_cast({:subscribe, chan}, [symbol, _] = state) do
    case Source.get_stream_bid_ask_update(chan, [symbol]) do
      {:ok, stream} -> Enum.each(
                         fn msg ->
                           handle_stream msg
      {:error, err} ->
        crash(self(), err)

    {:noreply, state}


handle_stream/1 is publishing message to another GenServer which is sending values to mqtt