Elixir-grpc - first-class gRPC Streams in Elixir

First-Class gRPC Streams in Elixir: A new composable API

For a long time, the Elixir community has lacked a truly idiomatic way to handle gRPC streaming. While unary RPCs have always been well-supported in the elixir-grpc library, stream handling felt incomplete — too low-level and too imperative for a language that thrives on functional and reactive principles.

This gap was finally addressed in issue #270, which led to a major improvement: first-class support for gRPC streams using a functional and declarative API that fits seamlessly into the Elixir ecosystem.

:rocket: This new API is available starting from elixir-grpc version 0.10.0 — with the latest release being 0.10.2.

It’s worth highlighting the irony: while Elixir is inherently concurrent and reactive — thanks to the BEAM and its actor-based model — its gRPC streaming capabilities lagged behind those found in other ecosystems.

Languages like Java (via Project Reactor or RxJava), Kotlin (with coroutines and Flows), and JavaScript (using RxJS or async iterators) have long embraced reactive paradigms for working with gRPC streams. These models allowed developers to work with streams as composable, functional data flows.

Until recently, Elixir developers lacked similar expressive power. The new API introduced in version 0.10.0 finally brings that same level of expressiveness and control to the Elixir world — and in a way that feels natural, pure, and declarative.

A Functional API for Streaming

The new streaming API is designed around stream composition and functional data flows, relying on GRPC.Stream to model and process streaming inputs and outputs. This new abstraction enables a developer experience that feels intuitive, powerful, and aligned with some Elixir’s design principles.

Here’s an example of the new API in action:

defmodule HelloworldStreams.Server do
  @moduledoc false
  use GRPC.Server, service: Stream.EchoServer.Service

  alias HelloworldStreams.Utils.Transformer
  alias GRPC.Stream, as: GRPCStream

  alias Stream.HelloRequest
  alias Stream.HelloReply

  @spec say_unary_hello(HelloRequest.t(), GRPC.Server.Stream.t()) :: any()
  def say_unary_hello(request, _materializer) do
    GRPCStream.unary(request)
    |> GRPCStream.ask(Transformer)
    |> GRPCStream.map(fn %HelloReply{} = reply ->
      %HelloReply{message: "[Reply] #{reply.message}"}
    end)
    |> GRPCStream.run()
  end

  @spec say_server_hello(HelloRequest.t(), GRPC.Server.Stream.t()) :: any()
  def say_server_hello(request, materializer) do
    create_output_stream(request)
    |> GRPCStream.from()
    |> GRPCStream.run_with(materializer)
  end

  defp create_output_stream(msg) do
    Stream.repeatedly(fn ->
      index = :rand.uniform(10)
      %HelloReply{message: "[#{index}] I'm the Server for #{msg.name}"}
    end)
    |> Stream.take(10)
    |> Enum.to_list()
  end

  @spec say_bid_stream_hello(Enumerable.t(), GRPC.Server.Stream.t()) :: any()
  def say_bid_stream_hello(request, materializer) do
    output_stream =
      Stream.repeatedly(fn ->
        index = :rand.uniform(10)
        %HelloReply{message: "[#{index}] I'm the Server ;)"}
      end)

    GRPCStream.from(request, join_with: output_stream)
    |> GRPCStream.map(fn
      %HelloRequest{} = hello ->
        %HelloReply{message: "Welcome #{hello.name}"}

      output_item ->
        output_item
    end)
    |> GRPCStream.run_with(materializer)
  end
end

As seen in the example above, composing different stages in the streaming pipeline is straightforward. Each step in the transformation can be expressed clearly, making the overall flow easier to follow and reason about.

To support this, the GRPC.Stream module offers a set of functions designed to operate directly on the stream, such as map, filter, flat_map, partition, reduce, uniq and so on. These utilities provide the flexibility needed to build expressive and efficient streaming logic while maintaining clarity and composability throughout the pipeline.

Conclusion

The introduction of a first-class streaming API represents a meaningful step forward for the Elixir gRPC ecosystem. It provides a more consistent and idiomatic way to implement streaming services using well-established functional constructs.

This new approach makes it easier to model real-time interactions, handle bi-directional communication, and process data streams using composable and readable code.

Further details and examples can be found in the official elixir-grpc repository and hex.

6 Likes