Is there an idiomatic way to write greedy streams in Elixir?

Yeah that firt stream call is supposed to represent a pipeline :S

I do not want async stream, the computation is heavy, I wanted to do it as fast as possible but one at a time. There is a single spawn here, so order is preserved.

The second stream is supposed to consume it all but I agree if there was a take(10) down below it would be nice to use it. Though it would require more than one additional process.

Edit: hmm i wonder if async_stream is greedy within its max concurrency limit. I guess no because it still requires something to pull from the stream.

How fast is operation 2 relative to operation 1?

It’s shorter :slight_smile:

Would pushing the second computation into a function to make it dependent on the first computation not accomplish the goal?

1..1000
|> Stream.map(&perform_compute/1)

def perform_compute(value) do
  value
  |> computation_1()
  |> computation_2()
end

(I can’t delete this but I realize it does not accomplish what you want as its still serial)

1 Like

Right whether this is good or not depends on the ratio of the speed of computation_1 vs computation_2. If computation 2 is 100x faster then this is fine. If it’s only twice as fast then this is less good. Unfortunately all we have to go off of is “It’s shorter” :person_shrugging: .

1 Like

Exactly.

Sorry I was in a hurry, I should not have posted just that :slight_smile:

I created this topic to discuss a general common pattern of concurrency between two streams, and not to discuss my specific case.

As I have written, in my case computation 2 is steady, it has constant time. Computation 1 is variable, sometimes it is faster than c2, sometimes it takes more time, way more time.

The sum of all computation 2 is shorter than the sum of all computation 1.

Computation 2 is not instant though, it takes times too.

Say computation 2 is 1 second, computation 1 ranges from 0.5 to 10 seconds.

I was feeling the need to not wait for computation 2 before starting the next computation 1, that is why I asked if a generic pattern or a concurrency handling function is already available.

I have around 500 items, which means I’ll have 500 seconds of computation 2. This is quite long so I wanted to run computations 1 in a greedy fashion, but only one at a time because of high memory consumption.

2 Likes

Having in mind that streams can have multiple runtime characteristics then the common / general pattern you are looking for either does not exist, or it exists in libraries – something that you are somewhat surprisingly adamant about not using.

Having in mind your use-case and if you want to never lose a single job then Flow might not be your best fit; Oban seems better because:

  1. Your first stream’s jobs are not instant (you said 0.5 - 1.0 secs) and it would be a waste to do the calculation but lose it because the runtime crashed or was restarted – f.ex. during deployment.
  2. You don’t want to overload the machine that executes the jobs from the second stream and want them executed one by one – but again, jobs that are “enqueued” should not be lost (IMO anyway) so again, Oban seems like a good fit: you make this second job queue with maximum concurrency of one job at a time and let the system happily chug along until all jobs are completed.
  3. You also get retries in case of intermittent failures.

That’s what I would recommend for production usage.

1 Like

Maybe it’s actually possible to use a single GenServer as a “sink” (and not introduce another library or DATABASE :slight_smile: ). This would be a single process and not a process-per element, and would reduce the overhead by quite a bit. Since computation_2 isn’t the most intense the GenServer sink can handle both computations and incoming requests. I haven’t tested this yet, I can try it later today or tomorrow but it’d be in the form:

defmodule ComputationSink do
  init(_) do
    %{
      # This is who the final result will be sent to.
      caller: nil,
      inputs: [],
      results: []
    }
  end

  # Here we enqueue more inputs.
  def handle_cast({:put_element, element}, state) do
    {:ok, %{state | inputs: [element | state.inputs]}
  end

  # This is the final call. We make additional calls to compute to process the
  # remaining elements. 
  def handle_call(:get_results, from, state) do
    self() ! :compute
    {:noreply, %{state | caller: from}}
  end

  # If the caller is nil this means that the inputs have not fully been sent, 
  # and that there could be more inputs to compute.
  def handle_info(:compute, %{inputs: [], caller: nil} = state) do
    {:ok, state}
  end

  # Handle the exhausted inputs. 
  # Forward the stored results to the process that initially
  # made the call. "Recursive base case"
  def handle_info(:compute, %{inputs: [], results: results, caller: caller}) when caller !== nil do
    GenServer.reply(caller, results)
    {:ok, %{}}
  end

  # Otherwise apply the computation and self-send a message to continue with processing.
  # This creates a blocking interface with non-blocking internals.
  def handle_info(:compute, state) do
     [current_element | result] = state.inputs
     self() ! :compute
     {:ok, %{state | 
      results: [computation_2(current_element) | state.results], 
      inputs: rest}}
  end
end

And then we can use this interface here:

pid_sink = GenServer.start_link(ComputationSink, [])
1..1000
|> Stream.map(&computation_1/)
|> Stream.map(fn element ->
   GenServer.cast(pid_sink, {:put_element, element})
end)
|> Stream.run()

# Blocking call even though the GenServer is still processing elements c:
results = GenServer.call(pid_sink, get_results)

This is actually my absolute favorite pattern in Erlang / Elixir and I wrote about it here: My Favorite Erlang Pattern

EDIT: you could also turn the cast into call and reply slowly or fast depending on the size of the inputs, this will be your backpressure mechanism. You just need to store the pid of the process attempting to enqueue, much like how it is done for the pid of the process storing the result. This would sadly effect the rate of how many computation_1s can run.

3 Likes