AstonJ

AstonJ

Catch-all thread

This is the catch-all thread, please see this thread for info.

Most Liked

sleipnir

sleipnir

First-Class gRPC Streams in Elixir: A new composable API

For a long time, the Elixir community has lacked a truly idiomatic way to handle gRPC streaming. While unary RPCs have always been well-supported in the elixir-grpc library, stream handling felt incomplete — too low-level and too imperative for a language that thrives on functional and reactive principles.

This gap was finally addressed in issue #270, which led to a major improvement: first-class support for gRPC streams using a functional and declarative API that fits seamlessly into the Elixir ecosystem.

:rocket: This new API is available starting from elixir-grpc version 0.10.0 — with the latest release being 0.10.2.

It’s worth highlighting the irony: while Elixir is inherently concurrent and reactive — thanks to the BEAM and its actor-based model — its gRPC streaming capabilities lagged behind those found in other ecosystems.

Languages like Java (via Project Reactor or RxJava), Kotlin (with coroutines and Flows), and JavaScript (using RxJS or async iterators) have long embraced reactive paradigms for working with gRPC streams. These models allowed developers to work with streams as composable, functional data flows.

Until recently, Elixir developers lacked similar expressive power. The new API introduced in version 0.10.0 finally brings that same level of expressiveness and control to the Elixir world — and in a way that feels natural, pure, and declarative.

A Functional API for Streaming

The new streaming API is designed around stream composition and functional data flows, relying on GRPC.Stream to model and process streaming inputs and outputs. This new abstraction enables a developer experience that feels intuitive, powerful, and aligned with some Elixir’s design principles.

Here’s an example of the new API in action:

defmodule HelloworldStreams.Server do
  @moduledoc false
  use GRPC.Server, service: Stream.EchoServer.Service

  alias HelloworldStreams.Utils.Transformer
  alias GRPC.Stream, as: GRPCStream

  alias Stream.HelloRequest
  alias Stream.HelloReply

  @spec say_unary_hello(HelloRequest.t(), GRPC.Server.Stream.t()) :: any()
  def say_unary_hello(request, _materializer) do
    GRPCStream.unary(request)
    |> GRPCStream.ask(Transformer)
    |> GRPCStream.map(fn %HelloReply{} = reply ->
      %HelloReply{message: "[Reply] #{reply.message}"}
    end)
    |> GRPCStream.run()
  end

  @spec say_server_hello(HelloRequest.t(), GRPC.Server.Stream.t()) :: any()
  def say_server_hello(request, materializer) do
    create_output_stream(request)
    |> GRPCStream.from()
    |> GRPCStream.run_with(materializer)
  end

  defp create_output_stream(msg) do
    Stream.repeatedly(fn ->
      index = :rand.uniform(10)
      %HelloReply{message: "[#{index}] I'm the Server for #{msg.name}"}
    end)
    |> Stream.take(10)
    |> Enum.to_list()
  end

  @spec say_bid_stream_hello(Enumerable.t(), GRPC.Server.Stream.t()) :: any()
  def say_bid_stream_hello(request, materializer) do
    output_stream =
      Stream.repeatedly(fn ->
        index = :rand.uniform(10)
        %HelloReply{message: "[#{index}] I'm the Server ;)"}
      end)

    GRPCStream.from(request, join_with: output_stream)
    |> GRPCStream.map(fn
      %HelloRequest{} = hello ->
        %HelloReply{message: "Welcome #{hello.name}"}

      output_item ->
        output_item
    end)
    |> GRPCStream.run_with(materializer)
  end
end

As seen in the example above, composing different stages in the streaming pipeline is straightforward. Each step in the transformation can be expressed clearly, making the overall flow easier to follow and reason about.

To support this, the GRPC.Stream module offers a set of functions designed to operate directly on the stream, such as map, filter, flat_map, partition, reduce, uniq and so on. These utilities provide the flexibility needed to build expressive and efficient streaming logic while maintaining clarity and composability throughout the pipeline.

Conclusion

The introduction of a first-class streaming API represents a meaningful step forward for the Elixir gRPC ecosystem. It provides a more consistent and idiomatic way to implement streaming services using well-established functional constructs.

This new approach makes it easier to model real-time interactions, handle bi-directional communication, and process data streams using composable and readable code.

Further details and examples can be found in the official elixir-grpc repository and hex.

Eiji

Eiji

Ok, so few years have passed. Why it took so long? It simply didn’t took any time. I’ve got just a single question if this library is maintained. I said yes, but so far no issue, pull request or even a question was created.

Therefore I was just using this project for my personal purposes and slowly refactoring it when needed. I think the completely new code is much better now. The documentation should also be much cleaner. This is a first release on hex simply because I’m not pushing initial library code or first release candidates.

enumex have a new DSL. From now on you start with as simple code as possible and extend it by adding components. Therefore there is no convention forced to your enum module. You can use a component to integrate ecto or write your own easily.

defmodule MyApp.StaticEnums do
  use Enumex.Static,
    components: [
      # Enumex.Static.Components.AbsinthePhase,
      # Enumex.Static.Components.Constant,
      # Enumex.Static.Components.EctoType,
      # Enumex.Static.Components.Guards,
      # Enumex.Static.Components.Index,
      # Enumex.Static.Components.List,
      # Enumex.Static.Components.Sort,
      # Enumex.Static.Components.Typespecs
    ]

    enum :my_enum, ~w[first second third]a
  end
end

All compile-time state is written into ets storage which means there are no module attributes that you have to be aware of. That’s said some modules may use module attributes to integrate with other library. The simplest example here is Ecto.Schema integration (@primary_key and @schema_prefix).

What I like the most is a simple component system.

defmodule MyApp.MyComponent do
  @moduledoc """
  A component documentation
  """

  use Enumex.Component

  @doc """
  A callback documentation
  """
  @callback my_func :: :ok

  comp [component: component_module] do
    @behaviour component_module

    @impl component_module
    def my_func, do: :ok
  end
end

You can start with a simple :string migration and end up with a PostgreSQL enumerated type or even work on runtime-determined values.

defmodule MyApp.DynamicEnums do
  use Enumex.Dynamic,
    components: [
      # Enumex.Dynamic.Components.Context,
      # Enumex.Dynamic.Components.Convert,
      # Enumex.Dynamic.Components.EctoChangeset,
      # Enumex.Dynamic.Components.EctoSchema,
      # Enumex.Dynamic.Components.Typespecs
    ],
    repo: MyApp.Repo

  enum ModulePart, :enum_name do
    # additional code like extra ecto schema fields
  end
end

Now working with enums in Elixir, PostgreSQL or even SQLite is as simple as adding a component. :heart:

Anyone who depends on enumerated types can use Postgres adapter directly or Migration wrapper. With such features migrations are very easy to setup without a need to learn how to execute create type … sql. What’s best all migrations can be used safely on untrusted values. In my case I have used a test name as enum name. The adapter handles even the most weird cases. :rocket:

defmodule ExampleMigration do
  use Ecto.Migration

  alias Enumex.Static.Adapters.Postgres
  alias Enumex.Static.Migration
  alias Enumex.Value

  @first_value Value.new(ExampleEnum, :example, :firstt, 1)
  @second_value Value.new(ExampleEnum, :example, :second, 2)
  @third_value Value.new(ExampleEnum, :example, :third, 3)

  def change do
    Migration.create_enum(Postgres, [@first_value, @third_value])
    Migration.rename_value(Postgres, @first_value, %{@first_value | id: :first})
    # when rollback use `@first_value` as a default when dropping `:second` value from enumerated type
    Migration.add_value(Postgres, @second_value, @first_value)
    # when migrate use `@second_value` as a default when dropping `:third` value from enumerated type
    Migration.drop_value(Postgres, @third_value, @second_value)
  end
end
dimitarvp

dimitarvp

It’s a good practice to just copy-paste some of the onboarding docs in the forum announcement. No need to give the forum readers homework – “go read the docs” – when you can make it easier for us to decide if we’re interested in using your thing.

Where Next?

Popular in News & Updates Top

hugobarauna
Livebook v0.7 is out! This is a major release coming with significant features in the following areas: secret management visual represe...
New
zachdaniel
The second video from the Ash Primers series is out! https://www.youtube.com/watch?v=GtsL_lIis4Q This one is about the migration generato...
New
jimsynz
It’s been a long time coming, but Reactor has finally reached 1.0. For those unfamiliar, Reactor is a dynamic, concurrent, dependency-re...
New
zachdaniel
Hey everyone! What I’ll cover in this post: Major refactors The future of Ash.Flow Current state of atomics and bulk actions Whats nex...
New
zachdaniel
Hey folks! In order to give the new AshSqlite a trial run and make sure its got some real usage, AshHq’s multi-package search has been re...
New
hugobarauna
Learn how to use Livebook to build a Machine Learning app and deploy it to Hugging Face in less than 15 minutes. If you have any questio...
New
mickel8
Hello everyone! :wave: I am thrilled to announce a new version of Jellyfish Media Server - v0.2.0! :tada: Features: Added RTSP compon...
New
sorenone
We have for you a new release candidate. Take :notes: if you use partitioned queues or make heavy use of Workflows. This is the RC for yo...
New
jjcarstens
Testing code destined for hardware can be tricky, but it just got one tiny bit easier! :tty0tty was just released which is an Elixir por...
New
fhunleth
I wanted to let everyone know that the Erlang Ecosystem Foundation (EEF) has a working group dedicated to embedded systems and IoT. We ha...
New

Other popular topics Top

skosch
To my knowledge, put_in, Map.update etc. all have the one limitation of not automatically creating intermediate keys when needed (for exa...
New
stefanchrobot
What’s the safe way to decode a JSON string into a struct? I want to avoid calling String.to_atom. Jason.decode can give me a map with st...
New
dokuzbir
I want to highlight html closing tags when i click a html tag. That works in .html files but doesnt work for html.eex templates. How can...
New
jay1
Why is it that the mnesia database isn’t the most preferred database for use in Elixir/Phoenix?
New
grych
Hi folks, Few months ago I have announced the proof-of-concept of the library to manipulate the browsers DOM objects directly from Elixi...
639 52341 488
New
RisingFromAshes
I’ve read in another post that it may be possible with a router helper - but I couldn’t find an appropriate one, and tbh, I’m still just ...
New
jason.o
In the code below, if the create action is not set to accept “extra_key” as an input, it errors out with a message shown above. Is there ...
New
komlanvi
Hi everyone, I was playing with phoenix liveView but I run into an issue. I have a form and want to validate each input text when the te...
New
marick
I had some trouble figuring out how to make many-to-many associations work. Once I got it working, I wrote a blog post. Because I’m a nov...
New
lanycrost
Hi everyone! I need implement if…else if…else condition from my elixir code, and anymore of this control flow structures not work proper...
New

We're in Beta

About us Mission Statement