First-Class gRPC Streams in Elixir: A new composable API
For a long time, the Elixir community has lacked a truly idiomatic way to handle gRPC streaming. While unary RPCs have always been well-supported in the elixir-grpc library, stream handling felt incomplete — too low-level and too imperative for a language that thrives on functional and reactive principles.
This gap was finally addressed in issue #270, which led to a major improvement: first-class support for gRPC streams using a functional and declarative API that fits seamlessly into the Elixir ecosystem.
This new API is available starting from
elixir-grpc
version 0.10.0 — with the latest release being 0.10.2.
It’s worth highlighting the irony: while Elixir is inherently concurrent and reactive — thanks to the BEAM and its actor-based model — its gRPC streaming capabilities lagged behind those found in other ecosystems.
Languages like Java (via Project Reactor or RxJava), Kotlin (with coroutines and Flows), and JavaScript (using RxJS or async iterators) have long embraced reactive paradigms for working with gRPC streams. These models allowed developers to work with streams as composable, functional data flows.
Until recently, Elixir developers lacked similar expressive power. The new API introduced in version 0.10.0
finally brings that same level of expressiveness and control to the Elixir world — and in a way that feels natural, pure, and declarative.
A Functional API for Streaming
The new streaming API is designed around stream composition and functional data flows, relying on GRPC.Stream
to model and process streaming inputs and outputs. This new abstraction enables a developer experience that feels intuitive, powerful, and aligned with some Elixir’s design principles.
Here’s an example of the new API in action:
defmodule HelloworldStreams.Server do
@moduledoc false
use GRPC.Server, service: Stream.EchoServer.Service
alias HelloworldStreams.Utils.Transformer
alias GRPC.Stream, as: GRPCStream
alias Stream.HelloRequest
alias Stream.HelloReply
@spec say_unary_hello(HelloRequest.t(), GRPC.Server.Stream.t()) :: any()
def say_unary_hello(request, _materializer) do
GRPCStream.unary(request)
|> GRPCStream.ask(Transformer)
|> GRPCStream.map(fn %HelloReply{} = reply ->
%HelloReply{message: "[Reply] #{reply.message}"}
end)
|> GRPCStream.run()
end
@spec say_server_hello(HelloRequest.t(), GRPC.Server.Stream.t()) :: any()
def say_server_hello(request, materializer) do
create_output_stream(request)
|> GRPCStream.from()
|> GRPCStream.run_with(materializer)
end
defp create_output_stream(msg) do
Stream.repeatedly(fn ->
index = :rand.uniform(10)
%HelloReply{message: "[#{index}] I'm the Server for #{msg.name}"}
end)
|> Stream.take(10)
|> Enum.to_list()
end
@spec say_bid_stream_hello(Enumerable.t(), GRPC.Server.Stream.t()) :: any()
def say_bid_stream_hello(request, materializer) do
output_stream =
Stream.repeatedly(fn ->
index = :rand.uniform(10)
%HelloReply{message: "[#{index}] I'm the Server ;)"}
end)
GRPCStream.from(request, join_with: output_stream)
|> GRPCStream.map(fn
%HelloRequest{} = hello ->
%HelloReply{message: "Welcome #{hello.name}"}
output_item ->
output_item
end)
|> GRPCStream.run_with(materializer)
end
end
As seen in the example above, composing different stages in the streaming pipeline is straightforward. Each step in the transformation can be expressed clearly, making the overall flow easier to follow and reason about.
To support this, the GRPC.Stream
module offers a set of functions designed to operate directly on the stream, such as map
, filter
, flat_map
, partition
, reduce
, uniq
and so on. These utilities provide the flexibility needed to build expressive and efficient streaming logic while maintaining clarity and composability throughout the pipeline.
Conclusion
The introduction of a first-class streaming API represents a meaningful step forward for the Elixir gRPC ecosystem. It provides a more consistent and idiomatic way to implement streaming services using well-established functional constructs.
This new approach makes it easier to model real-time interactions, handle bi-directional communication, and process data streams using composable and readable code.
Further details and examples can be found in the official elixir-grpc repository and hex.