Hey everyone,
I want to receive data from a gRPC stream.
So far, I only wrote functions, played around in iex and was able to receive and decode data. It is time to “applicationify” that previous work and add lifecycle management/processes. But somehow, the gRPC behaviour seems different in GenServer compared to the iex session?
I found a similar post in the forum: Elixir-grpc can't receive stream response. Unexpected when waiting for headers- which ends on
Looks like GRPC.Stub.recv can’t be used in GenServers
But that seems weird to me.
Setup in iex
Basically, the stripped down version looks something like this:
# wrapper around GRPC.Stub.connect() that handles auth etc
{:ok, channel} = SalesforceEvents.Grpc.connect()
grpc_stream = Eventbus.V1.PubSub.Stub.subscribe(channel)
GRPC.Stub.send_request(grpc_stream, Messages.request_data())
{:ok, data_stream} = GRPC.Stub.recv(grpc_stream, timeout: 60_000)
Enum.each(data_stream, &IO.inspect/1)
# shell blocks, protobuf-decoded data is printed as it comes in
Setup with GenServer
Same as above, but in a GenServer.
Handling incoming stream
data with Enum.each
is spawned as a separate process:
@impl true
def init(args) do
{:ok, channel} = SalesforceEvents.Grpc.connect()
grpc_stream = Eventbus.V1.PubSub.Stub.subscribe(channel)
GRPC.Stub.send_request(grpc_stream, Messages.request_data())
{:ok, data_stream} = GRPC.Stub.recv(grpc_stream, timeout: 60_000)
receiver_process =
spawn_link(fn -> Enum.each(data_stream, &IO.inspect/1) end)
state = %{
channel: channel,
grpc_stream: grpc_stream,
data_stream: data_stream,
receiver_process: receiver_process
}
{:ok, state}
end
def handle_info(info, state) do
Logger.debug(inspect info)
{:noreply, state}
end
Expected Behaviour
Received gRPC messages are still printed to the console
Actual Behaviour
Instead of the messages being printed, the handle_info
receives messages like the following:
08:45:41.824 [debug] {:gun_data, #PID<0.339.0>, #Reference<0.2404288951.1709178882.257606>, :nofin, <<0, 0, 0, 1, 198, 10, 145, 3, …>>}
Why does this behave differently (nothing is printed, messages are not decoded)?
How can I receive data from a gRPC stream within a process getting results similar to what I had in iex?