Oooh this is so cool!
For some of my projects, I built what is, essentially, a “durable reactive graph“ library (journey). It is not exactly what you are building (I think, since this is very different from n8n in many respects), but it has some similarities that you might find useful, at least at the level of concepts, in that it lets you define a graph with inputs and computations, with direct or conditional dependencies:
import Journey.Node
graph = Journey.new_graph(
"demo graph",
"v1",
[
input(:x),
input(:y),
# :sum is unblocked when :x and :y are provided.
compute(:sum, [:x, :y], fn %{x: x, y: y} -> {:ok, x + y} end),
# :large_value_alert is unblocked when :sum is provided and is greater than 40.
compute(
:large_value_alert,
[sum: fn sum_node -> sum_node.node_value > 40 end],
fn %{sum: sum} -> {:ok, "🚨, at #{sum}"} end,
f_on_save: fn _execution_id, _result ->
# (e.g. send a pubsub notification to the LiveView process, to update the UI)
:ok
end
)
]
)
and then to create and run executions of that graph (set their input values, and read their computed values):
execution = Journey.start_execution(graph)
execution = Journey.set_value(execution, :x, 12)
execution = Journey.set_value(execution, :y, 2)
Journey.get_value(execution, :sum, wait_any: true)
{:ok, 14}
Journey.get_value(execution, :large_value_alert)
{:error, :not_set}
I think this might be different from your use case in some ways (?), and so Journey takes care of a number of things behind the scenes that I needed for my use cases (e.g. Phoenix applications that guide users through a multi-step, conditional flow).
(For my use cases, I needed persistence (and so the value of every node is persisted as soon as it is set or computed), reliability of computations (so the functions in those compute nodes are subject to retry policies), horizontal scalability of computations (those functions will be picked up and called on any of the replicas of my application). I also needed careful mutability and one-time and recurring scheduling (so journey also has mutate(), schedule_once() and schedule_recurring() types of nodes, in addition to input() and compute() that you see in this example), and durability (so that I can always Journey.load() an execution by its id, even after all kinds of outages / redeployments / page reloads, at any time in the future, and it will continue as if nothing happened).)
I also needed some niceties, which may or may not be relevant for your case – things like visualizing the graph:
Journey.Tools.generate_mermaid_graph(graph)
here is the mermaid that gets generated:
graph TD
%% Graph
subgraph Graph["🧩 'demo graph', version v1"]
execution_id[execution_id]
last_updated_at[last_updated_at]
x[x]
y[y]
sum["sum<br/>(anonymous fn)"]
large_value_alert["large_value_alert<br/>(anonymous fn)"]
x --> sum
y --> sum
sum --> large_value_alert
end
%% Styling
classDef inputNode fill:#e1f5fe,stroke:#01579b,stroke-width:2px,color:#000000
classDef computeNode fill:#f3e5f5,stroke:#4a148c,stroke-width:2px,color:#000000
classDef scheduleNode fill:#fff3e0,stroke:#e65100,stroke-width:2px,color:#000000
classDef mutateNode fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px,color:#000000
%% Apply styles to actual nodes
class y,x,last_updated_at,execution_id inputNode
class large_value_alert,sum computeNode
To help me understand the state of things (e.g. “hey did Mario get their reminder email?”), Journey has some introspection tooling that tells me the state of an execution. In this example you can see that :sum didn’t get computed because :y is yet to be provided:
iex(8)> Journey.Tools.summarize_as_text(execution.id) |> IO.puts
Execution summary:
- ID: 'EXECAT36JBH8VTM82150REVX'
- Graph: 'demo graph' | 'v1'
- Archived at: not archived
- Created at: 2025-09-04 05:06:04Z UTC | 54 seconds ago
- Last updated at: 2025-09-04 05:06:39Z UTC | 19 seconds ago
- Duration: 35 seconds
- Revision: 1
- # of Values: 3 (set) / 6 (total)
- # of Computations: 2
Values:
- Set:
- last_updated_at: '1756962399' | :input
set at 2025-09-04 05:06:39Z | rev: 1
- x: '12' | :input
set at 2025-09-04 05:06:39Z | rev: 1
- execution_id: 'EXECAT36JBH8VTM82150REVX' | :input
set at 2025-09-04 05:06:04Z | rev: 0
- Not set:
- large_value_alert: <unk> | :compute
- sum: <unk> | :compute
- y: <unk> | :input
Computations:
- Completed:
- Outstanding:
- sum: ⬜ :not_set (not yet attempted) | :compute
:and
├─ ✅ :x | &provided?/1 | rev 1
└─ 🛑 :y | &provided?/1
- large_value_alert: ⬜ :not_set (not yet attempted) | :compute
🛑 :sum | &-normalize_gated_by/1-fun-0-/1
I have been using Journey in Phoenix applications, and since every customer flow is a Journey execution, it is easy for Journey to generate a bit of “flow analytics” / “flow funnel” data. This example, with just 3 executions in the system, is not particularly rich, ; ) and its too new to have any retrospective “flow ends here” data, but think of a web analytics funnel, and you get the idea:
iex(18)> Journey.Insights.FlowAnalytics.flow_analytics(graph.name, graph.version) |> Journey.Insights.FlowAnalytics.to_text()|> IO.puts
Graph: 'demo graph'
Version: 'v1'
Analyzed at: 2025-09-04T05:10:16.228250Z
EXECUTION STATS:
----------
Total executions: 3
Average duration: 23 seconds
Median duration: 20 seconds
NODE STATS (4 nodes):
----------
Node Name: 'x'
Type: input
Reached by: 3 executions (100.0%)
Average time to reach: 19 seconds
Flow ends here: 0 executions (0.0% of all, 0.0% of reached)
Node Name: 'sum'
Type: compute
Reached by: 2 executions (66.7%)
Average time to reach: 17 seconds
Flow ends here: 0 executions (0.0% of all, 0.0% of reached)
Node Name: 'y'
Type: input
Reached by: 2 executions (66.7%)
Average time to reach: 17 seconds
Flow ends here: 0 executions (0.0% of all, 0.0% of reached)
Node Name: 'large_value_alert'
Type: compute
Reached by: 1 executions (33.3%)
Average time to reach: 13 seconds
Flow ends here: 0 executions (0.0% of all, 0.0% of reached)
, etc.
I don’t know how this aligns with your particular use case, but this has been quite useful for building Phoenix applications that take a user through a flow that needs to be durable and reliable. I don’t need to keep reimplementing the same plumbing, and my applications have become thin and lightweight, and well-structured.
I am excited to see where your project takes you! ; )