Is there an idiomatic way to write greedy streams in Elixir?

Hello,

Is there an idiomatic way to write greedy streams in elixir?

Given this example:

1..1000
|> Stream.map(&computation_1/1)
|> Stream.map(&computation_2/1)
|> ...

If I understand streams correctly, this means that for each item we will have computation_1, then computation_2, and then we go to the next item.

What I would like is for computation_1 to be run as fast as possible without needing to be pulled from downstream. This is useful when computation_1 has variable times and computation_2 has steady times.

I could use Enum:

1..1000
|> Enum.map(&computation_1/1)
|> Stream.map(&computation_2/1)
|> ...

This is very greedy, but it blocks computation_2. It will not start until all computation_1 calls have been made.

So, is there a well-known pattern somewhere for that? In my specific case I’d like not to resort to aync_stream because of memory consumption.

Edit: hmmm while proofreading my post I realize that what I want is essentially concurrent. There is no solution on a single process. So I guess I can just run the top stream in a process, send all results to a second process, and receive in a stream from there.

3 Likes

I think Task.async_stream/3 might be something you could use.

NVM, saw you mentioning it as not the optimal solution.

1 Like

A nice compromizing solution – or a good prototype – would be to insert Stream.chunk_every(1000) in the middle and see if it helps.

2 Likes

If your real-world processing gets more advanced, I think you could also take a look at GenStage, it will allow to make a much more readable and configurable processing pipeline.

2 Likes

Not optimal but definitely an improvement in some cases. If I have a slow computation for every 10 computations, then a chunk size around 10 should help.

@D4no0 yes that’s the kind of setup that would involve two processes I was talking about in my Edit. Not sure if pulling that library would always be desirable but if yes then it would make a simple-to-write solution!

1 Like

I agree it’s not optimal, I simply don’t know anything about your problem. It seems like something that would accelerate computation_2 a bit.

If we’re talking truly optimal I’d just have a Kafka / NATS queue and send the results of the first computation to it and computation 2 would be pulling from that queue with the maximum speed and parallelism possible i.e. you would code a solution that’s hard-bottlenecked on the speed with which items from computation_1 can be emitted.

I think it depends on the use-case, but I would highly recommend to read the documentation as it should contain the list of things it solves compared to self-rolled processing pipelines.

1 Like

I simply don’t know anything about your problem

Oh I know, I meant it was actually a simple and good enough solution. Simplicity is a virtue :slight_smile:

My current use case is generating an ex_unit test from external resources (computation_1) and syncing that file to the disk or an external API.

This is not something where you want external services such as Kafka.

Honestly there is not much to optimize in here, I created that topic for the general case. We could reduce the scope a little bit. I think the real question can be reduced: Given two steps in a stream chain, how can we run both steps in parallel. While computation_2 runs for item i, how to make computation_1 run for item i+1.

1 Like

Then a very normal task pool, a library or homegrown, will do the job just fine?

Yeah I’m running homegrown :smiley: One process running the top stream and sending every reesults to the parent process.

The parent process starts it’s stream by looping over receive.

You can do even better than that. If you use f.ex. opq or poolex you can distribute the results of computation_1 to a job pool and then it will run jobs on each complete computation – in parallel.

The way you seem to be describing your usage seems like the distribution of tasks for downstream processing is serial (by a single process running receive in a loop). And it can be made parallel.

EDIT: you can probably even use Phoenix.PubSub. :thinking:

2 Likes

Checkout Flow. It provides a way to do this sort of “staged” and multi process streaming. Flow — Flow v1.2.4

9 Likes

Thank you :slight_smile:

I was more searching a way to do that with the Stream module. I’m not ready to pull a library just for this task.

But I guess this is not natively supported.

That library is released by elixir core team to cover exactly your use-case, this is the middle library between Stream and GenStage, I completely forgot about it.

1 Like

If I remember correctly it is even built on top of GenStage!

1 Like

Damn, you are right!

For some reason I always thought that Flow was released before GenStage.

I think this task is a lot more difficult than you’re saying. The main challenges are around managing buffer and minimizing stall. If you do it as you’re describing you’ll OOM with large inputs, and you’ll also limit yourself to a single core while it works on that phase.

There was extensive discussion around the use of Stream for this purpose around ~2015 (it was actually the subject of my first ElixirConf talk) and ultimately Flow was the answer. As @D4no0 notes Flow is less “a library” and more of an “external stdlib”, I definitely recommend it for this purpose.

4 Likes

If you are adamant about hand-rolling this for your needs then you’ll very likely reinvent Flow at one point. :person_shrugging:

But if you have the time and the will then it might be a precious learning experience.

2 Likes

Well I just need the first stream to be ran as quickly as possible, so it’s actually very simple and boils down to that:

parent = self()

spawn_link(fn ->
  source_of_items()
  |> Stream.map(&computation_1/1)
  |> Enum.each(fn result ->
    send(parent, {:result, result})
  end)

  send(parent, :done)
end)

Stream.unfold(nil, fn _ ->
  receive do
    {:result, result} -> {result, nil}
    :done -> nil
  end
end)
|> Stream.map(&computation_2/1)
|> Stream.run()

The first stream isn’t doing anything, you could simply Enum.each, call the computation, and send it to get a bit less overhead.

And actually looking at it more closely, this seems like a less optimal version of Task.async_stream. Particularly if you use the ordered: false option, that’s going to do the same thing (compute operation 1 in other processes) but without the memory bloat of doing it all ahead of time whether operation 2 can consume it or not.

4 Likes