GenServer/Process to receive gRPC Stream Data?

Hey everyone,

I want to receive data from a gRPC stream.

So far, I only wrote functions, played around in iex and was able to receive and decode data. It is time to “applicationify” that previous work and add lifecycle management/processes. But somehow, the gRPC behaviour seems different in GenServer compared to the iex session? :thinking:

I found a similar post in the forum: Elixir-grpc can't receive stream response. Unexpected when waiting for headers- which ends on

Looks like GRPC.Stub.recv can’t be used in GenServers :thinking:

But that seems weird to me.

Setup in iex

Basically, the stripped down version looks something like this:

  # wrapper around GRPC.Stub.connect() that handles auth etc
  {:ok, channel} = SalesforceEvents.Grpc.connect()

  grpc_stream = Eventbus.V1.PubSub.Stub.subscribe(channel)
  GRPC.Stub.send_request(grpc_stream, Messages.request_data())
  {:ok, data_stream} = GRPC.Stub.recv(grpc_stream, timeout: 60_000)
  
  Enum.each(data_stream, &IO.inspect/1)
# shell blocks, protobuf-decoded data is printed as it comes in

Setup with GenServer

Same as above, but in a GenServer.

Handling incoming stream data with Enum.each is spawned as a separate process:

@impl true
def init(args) do
  {:ok, channel} = SalesforceEvents.Grpc.connect() 

  grpc_stream = Eventbus.V1.PubSub.Stub.subscribe(channel)
  GRPC.Stub.send_request(grpc_stream, Messages.request_data())
  {:ok, data_stream} = GRPC.Stub.recv(grpc_stream, timeout: 60_000)

  receiver_process =
    spawn_link(fn -> Enum.each(data_stream, &IO.inspect/1) end)

  state = %{
    channel: channel,
    grpc_stream: grpc_stream,
    data_stream: data_stream,
    receiver_process: receiver_process
  }

  {:ok, state}
end

def handle_info(info, state) do
   Logger.debug(inspect info)
  {:noreply, state}
end

Expected Behaviour

Received gRPC messages are still printed to the console

Actual Behaviour

Instead of the messages being printed, the handle_info receives messages like the following:

08:45:41.824 [debug] {:gun_data, #PID<0.339.0>, #Reference<0.2404288951.1709178882.257606>, :nofin, <<0, 0, 0, 1, 198, 10, 145, 3, …>>}

Why does this behave differently (nothing is printed, messages are not decoded)?
How can I receive data from a gRPC stream within a process getting results similar to what I had in iex?

Interestingly it works when I move

  grpc_stream = Eventbus.V1.PubSub.Stub.subscribe(channel)
  GRPC.Stub.send_request(grpc_stream, Messages.request_data())
  {:ok, data_stream} = GRPC.Stub.recv(grpc_stream, timeout: 60_000)

into the spawned process.

Do the subscribe and recv functions need to be called from the same process?

Why is that?