Grains - Process Data Flow Orchestration

We released 1.0 of Grains, a library to declaratively define data flows comprised of chaining processes.

In our production use of Elixir, we found a repeating pattern: We split the work into concurrent processes, and data flows through those processes. Each of the processes implements a small-ish part of the pipeline, separating different work items. This works very well already with plain Elixir and Erlang. What we were missing is a way to define the high-level architecture of those processes. Figuring out the architecture in the source code can be difficult: it is not always clear which process sends messages were. Of course, architecture diagrams can help here, but they tend to bit-rot in an evolving system. This is were Grains enters the game: build a high-level architecture from small-ish grains.

Building an Architecture

For example, lets consider that we want three processes A, B, C to implement a computational pipeline. The data flow should be A -> B -> C, so A moves data to B, and B moves data to C. In grains, we would describe this data flow like this:

recipe =
  Grains.Recipe.new(
      :OurDataFlow, # An atom as a description of this flow
      %{
        :A => :B,
        :B => :C
      }
  )

So, the recipe is a directed graph, where the edges indicate where data flows.

To initialize the processes, each grain in the recipe must be started:

grains =
    Grains.new(%{
        :A => {ModuleA, args_for_A, special_args_for_grains},
        :B => ...,
        :C => ...
    })

We then combine the recipe with the grains and bake bread:

bread = Grains.start_supervised(recipe, grains)

bread is a supervisor, which can be used in a regular supervisor tree.

For a full example, see the heat example on gitlab.

Implementing a Grain

A grain is a simple GenServer with a use Grains.GenGrain directive (GenGrain is built upon GenServer):

defmodule ExampleGrain do
  use Grains.GenGrain
  
  # init works just like GenServer.init/1
  def init(args) do
    state = State.from(args)
    {:ok, state}
  end
  
  # Messages are pushed into grains..
  def handle_push(message, _from, state) do
    push(message) # ..and also pushed out of grains
    {:noreply, state}
  end
  
  # successor's can request data from upstream grains by pull()-ing
  def handle_pull(_from, state) do
    {:reply, :message_reply, state}
  end
  
  def handle_call(call, _from, state) do
    # A grain may be used as any other GenServer
    {:reply, :ok, state}
  end
end

Addional Features

Grains comes with extensive testing support:

  • Special test grains (Grains.Support.Publisher, Grains.Support.Subscriber, …) simplifying black-box testing of grains
  • Functionality to inject and read messages between grains
  • Automatically generate mermaid graphs from a recipe with Grains.Recipe.to_mermaid/1,2

Future Work

We have been using Grains in production for more than a year and it proved stable and maintainable by now. Grains is still evolving: testing facilities, special grains (such as the periodic grain) and other additions will find its way into the library over time. Find us on gitlab! We are looking forward to suggestions, requests and of course also merge requests!

7 Likes

We just released v1.3.0. Recent changes since v1.0.0 (changelog):

  • v1.1.0/v1.1.1 added a helper to simplify testing a grain
  • v1.2.0 added a helper for “debug reply chains”. This is used to flood messages through the flow graph in order to guarantee that prior messages were delivered
  • v1.3.0 adds “ghost edges”. Such edges are non-functional from the POV of grains, and exist for documentation. For example, if two processes communicate using GenServer.calls/2, but never using push/pull, they have a ghost edge.
1 Like