Maybe itâs actually possible to use a single GenServer as a âsinkâ (and not introduce another library or DATABASE
). This would be a single process and not a process-per element, and would reduce the overhead by quite a bit. Since computation_2
isnât the most intense the GenServer sink can handle both computations and incoming requests. I havenât tested this yet, I can try it later today or tomorrow but itâd be in the form:
defmodule ComputationSink do
init(_) do
%{
# This is who the final result will be sent to.
caller: nil,
inputs: [],
results: []
}
end
# Here we enqueue more inputs.
def handle_cast({:put_element, element}, state) do
{:ok, %{state | inputs: [element | state.inputs]}
end
# This is the final call. We make additional calls to compute to process the
# remaining elements.
def handle_call(:get_results, from, state) do
self() ! :compute
{:noreply, %{state | caller: from}}
end
# If the caller is nil this means that the inputs have not fully been sent,
# and that there could be more inputs to compute.
def handle_info(:compute, %{inputs: [], caller: nil} = state) do
{:ok, state}
end
# Handle the exhausted inputs.
# Forward the stored results to the process that initially
# made the call. "Recursive base case"
def handle_info(:compute, %{inputs: [], results: results, caller: caller}) when caller !== nil do
GenServer.reply(caller, results)
{:ok, %{}}
end
# Otherwise apply the computation and self-send a message to continue with processing.
# This creates a blocking interface with non-blocking internals.
def handle_info(:compute, state) do
[current_element | result] = state.inputs
self() ! :compute
{:ok, %{state |
results: [computation_2(current_element) | state.results],
inputs: rest}}
end
end
And then we can use this interface here:
pid_sink = GenServer.start_link(ComputationSink, [])
1..1000
|> Stream.map(&computation_1/)
|> Stream.map(fn element ->
GenServer.cast(pid_sink, {:put_element, element})
end)
|> Stream.run()
# Blocking call even though the GenServer is still processing elements c:
results = GenServer.call(pid_sink, get_results)
This is actually my absolute favorite pattern in Erlang / Elixir and I wrote about it here: My Favorite Erlang Pattern
EDIT: you could also turn the cast
into call
and reply slowly or fast depending on the size of the inputs, this will be your backpressure mechanism. You just need to store the pid of the process attempting to enqueue, much like how it is done for the pid of the process storing the result. This would sadly effect the rate of how many computation_1s can run.