We released 1.0 of Grains, a library to declaratively define data flows comprised of chaining processes.
In our production use of Elixir, we found a repeating pattern: We split the work into concurrent processes, and data flows through those processes. Each of the processes implements a small-ish part of the pipeline, separating different work items. This works very well already with plain Elixir and Erlang. What we were missing is a way to define the high-level architecture of those processes. Figuring out the architecture in the source code can be difficult: it is not always clear which process sends messages were. Of course, architecture diagrams can help here, but they tend to bit-rot in an evolving system. This is were Grains enters the game: build a high-level architecture from small-ish grains
.
Building an Architecture
For example, lets consider that we want three processes A
, B
, C
to implement a computational pipeline. The data flow should be A -> B -> C
, so A
moves data to B
, and B
moves data to C
. In grains, we would describe this data flow like this:
recipe =
Grains.Recipe.new(
:OurDataFlow, # An atom as a description of this flow
%{
:A => :B,
:B => :C
}
)
So, the recipe
is a directed graph, where the edges indicate where data flows.
To initialize the processes, each grain in the recipe must be started:
grains =
Grains.new(%{
:A => {ModuleA, args_for_A, special_args_for_grains},
:B => ...,
:C => ...
})
We then combine the recipe with the grains and bake bread:
bread = Grains.start_supervised(recipe, grains)
bread
is a supervisor, which can be used in a regular supervisor tree.
For a full example, see the heat
example on gitlab.
Implementing a Grain
A grain is a simple GenServer
with a use Grains.GenGrain
directive (GenGrain
is built upon GenServer
):
defmodule ExampleGrain do
use Grains.GenGrain
# init works just like GenServer.init/1
def init(args) do
state = State.from(args)
{:ok, state}
end
# Messages are pushed into grains..
def handle_push(message, _from, state) do
push(message) # ..and also pushed out of grains
{:noreply, state}
end
# successor's can request data from upstream grains by pull()-ing
def handle_pull(_from, state) do
{:reply, :message_reply, state}
end
def handle_call(call, _from, state) do
# A grain may be used as any other GenServer
{:reply, :ok, state}
end
end
Addional Features
Grains comes with extensive testing support:
- Special test grains (
Grains.Support.Publisher
,Grains.Support.Subscriber
, …) simplifying black-box testing of grains - Functionality to inject and read messages between grains
- Automatically generate mermaid graphs from a recipe with
Grains.Recipe.to_mermaid/1,2
Future Work
We have been using Grains in production for more than a year and it proved stable and maintainable by now. Grains is still evolving: testing facilities, special grains (such as the periodic
grain) and other additions will find its way into the library over time. Find us on gitlab! We are looking forward to suggestions, requests and of course also merge requests!