Can I act on streams before they're finished?

Hey folks,

I’m writing a client for a GRPC server which needs to stream values back over time. I want to act on these values as they arrive by sending them to another GenServer for processing. The GRPC client library appears to return a Stream for streaming responses, which is what I think I need here.

In looking at the Stream module docs, it seems as if it doesn’t perform computation unless I call a function in Enum. I’m not sure if that’s what I want, though. If my server returned a list of responses in a single batch, I could understand not using streams. If my server sent a million responses, I could understand using a stream and deferring processing. But I seem to be in a middle ground where my server might return, say, 10 responses 5 seconds apart. I don’t want my function call to block, so using a stream makes sense, and I don’t know that I want to call the Enum functions immediately in the RPC call since I assume that a) blocks and b) is only done when the stream is complete. But I want to push responses out as they arrive, so I don’t think I want to call an Enum function on the stream and wait for it to finish.

Am I misunderstanding how streams work? Do the Enum calls that perform calculations expect the stream to be finished when they’re called? Is there a function I can call to perform side-effects on a stream without calling Enum? I don’t really care that a stream results in a certain list of values. I just want to perform a quick message send on each element without blocking for any longer than the message send takes.

Sorry I don’t have any tangible code to show. We’re reworking a legacy system to use Phoenix for its web/database integration, connected to a Java-based backend that performs a bit of heavy document conversion. I’m evaluating GRPC for this task, and our document conversion needs to stream back results page by page where it makes sense. I want the Phoenix app to display incoming results in near-real-time, but it looks like streams may make me wait until the full document is converted to perform the Enum call. So I hope I’m missing something, but we’re on a tight budget, and I don’t want to go down the GRPC/streaming route if there really is no way to access a stream before it is finished.

Thanks.

I haven’t jumped into this area but it may be worth scanning through the Flow docs - there is a discussion on windows & triggers that may be relevant to what you are trying to do:
https://hexdocs.pm/flow/Flow.html#module-data-completion-windows-and-triggers

1 Like

Stream.each will perform side effects for you, and then you can use Stream.run to ‘pull’ elements through the stream and execute it. It just returns :ok when it’s done.

1 Like

Thanks, this is essentially what I did. I also spun up a Task because
the request did indeed block, but it looks like each does what I want.