If I understand streams correctly, this means that for each item we will have computation_1, then computation_2, and then we go to the next item.
What I would like is for computation_1 to be run as fast as possible without needing to be pulled from downstream. This is useful when computation_1 has variable times and computation_2 has steady times.
This is very greedy, but it blocks computation_2. It will not start until all computation_1 calls have been made.
So, is there a well-known pattern somewhere for that? In my specific case I’d like not to resort to aync_stream because of memory consumption.
Edit: hmmm while proofreading my post I realize that what I want is essentially concurrent. There is no solution on a single process. So I guess I can just run the top stream in a process, send all results to a second process, and receive in a stream from there.
If your real-world processing gets more advanced, I think you could also take a look at GenStage, it will allow to make a much more readable and configurable processing pipeline.
Not optimal but definitely an improvement in some cases. If I have a slow computation for every 10 computations, then a chunk size around 10 should help.
@D4no0 yes that’s the kind of setup that would involve two processes I was talking about in my Edit. Not sure if pulling that library would always be desirable but if yes then it would make a simple-to-write solution!
I agree it’s not optimal, I simply don’t know anything about your problem. It seems like something that would accelerate computation_2 a bit.
If we’re talking truly optimal I’d just have a Kafka / NATS queue and send the results of the first computation to it and computation 2 would be pulling from that queue with the maximum speed and parallelism possible i.e. you would code a solution that’s hard-bottlenecked on the speed with which items from computation_1 can be emitted.
I think it depends on the use-case, but I would highly recommend to read the documentation as it should contain the list of things it solves compared to self-rolled processing pipelines.
Oh I know, I meant it was actually a simple and good enough solution. Simplicity is a virtue
My current use case is generating an ex_unit test from external resources (computation_1) and syncing that file to the disk or an external API.
This is not something where you want external services such as Kafka.
Honestly there is not much to optimize in here, I created that topic for the general case. We could reduce the scope a little bit. I think the real question can be reduced: Given two steps in a stream chain, how can we run both steps in parallel. While computation_2 runs for item i, how to make computation_1 run for item i+1.
You can do even better than that. If you use f.ex. opq or poolex you can distribute the results of computation_1 to a job pool and then it will run jobs on each complete computation – in parallel.
The way you seem to be describing your usage seems like the distribution of tasks for downstream processing is serial (by a single process running receive in a loop). And it can be made parallel.
That library is released by elixir core team to cover exactly your use-case, this is the middle library between Stream and GenStage, I completely forgot about it.
I think this task is a lot more difficult than you’re saying. The main challenges are around managing buffer and minimizing stall. If you do it as you’re describing you’ll OOM with large inputs, and you’ll also limit yourself to a single core while it works on that phase.
There was extensive discussion around the use of Stream for this purpose around ~2015 (it was actually the subject of my first ElixirConf talk) and ultimately Flow was the answer. As @D4no0 notes Flow is less “a library” and more of an “external stdlib”, I definitely recommend it for this purpose.
The first stream isn’t doing anything, you could simply Enum.each, call the computation, and send it to get a bit less overhead.
And actually looking at it more closely, this seems like a less optimal version of Task.async_stream. Particularly if you use the ordered: false option, that’s going to do the same thing (compute operation 1 in other processes) but without the memory bloat of doing it all ahead of time whether operation 2 can consume it or not.