I’d like to be able to define compute pipelines that can be modified at run time. Is this possible using Genstage? I’ve already posted on Stack Overflow, but the idea basically is to be able to, say, switch one producer-consumer out for another one during runtime. I don’t know how this would be done?
I have added a bounty to this question (350 points)
How much control do you want over the switching over ?
If you define your GenServer def MyServer.myfunc(x), do: x - 7 end
and load a new module with def MyServer.myfunc(x), do: x - 4 end
dynamically at runtime BEAM is smart enough to run all the new inputs thru the new function automatically. This is a function of hot-code loading.
You could also use Kernel.apply/3
reading module and function_name from a cache.
May I ask what you are trying to achieve? Because at first glance it seems you are using GenStage pipelines to model code-based concerns while stages should be added when you need to model runtime concerns (such as failures and/or back-pressure).
In other words, I would solve this problem by having a stage that knows how to call subtract 7
or subtract 4
accordingly, without touching the stage organization. In the same way any code you write may swap between two function calls by using conditionals.
There is a chance all of your stages above could be a simple stage.
I am a quant analyst in finance. We deal with big firehose of raw data, that needs progressively to be “dimensionally reduced” to make sense in graphical ways. IE we need to visualize the data for the small amount of “gold nuggets” in the sand.
There are various ways of doing this, but most involved some dimensionality reduction followed by machine learning. Yet all of these techniques have multiple different, sometimes optional, stages and layers and being finance, with large amounts of money involved, clients and sales want to know how the algorithms work. “What happens if you do this?”, “did you use the correlation matrix or the covariance matrix?”, “what is the relationship to the S&P500?”, “Can you remove this one sector from the analysis?”, “Can I see what this looks like for Brazil Bovespa input instead of Dow” Etc etc etc.
So we are exploring Elixir for building a platform where users can graphically build their own calculation and visualization pipelines. And these they will he able to “what if” by plugging in different algorithms in real time and see how this changes the results. We are somewhat inspired by luna-lang https://www.luna-lang.org/.
We like Elixir because even if we won’t do heavy calcs in it, we can easily call out to Python/Numpy/Torch/Dask/Ray etc, but more importantly it gives us ways to scale to lots of clients, all the while managing quite a lot of complexity, and we love Phoenix channels for getting all of this stuff to and from the browser or whichever end client.
So basically, while I see the “anti-pattern” of one algo per genstage, I do want to go this way because it is a requirement of the system that analysts and clients in a scalable and fine grained way be able to modify big pipelines, the way they want, in real time.
So basically, I provide them with a bunch of algo/visualization lego blocks and they can build their own analysis. Keep in mind that finance is high pressure and decisions can change minute-by-minute which is why we’d like this stuff to be real time live. Even a few minutes pause for a re-compilation of a static pipeline might be too much.
Maybe you can write a simple “stage processor” which would receive a data structure like
[
{:add, amount},
{:times, amount},
{:subtract, amount}
...
]
and then run it through an “interpreter”?
def interpret({:add, amount}, acc) do
acc + amount
end
# etc
Then in your processes (like phoenix channels) you’d just keep a reference to the data structure of instructions and re-interpret it on each modification. You can add a diffing algo and reuse unchanged computations as well probably.
as per my question, we may have very complex pipelines with numerous branches. They will always be acyclic, but they may well have many many child nodes per parent node in some cases. This makes the above workflow unworkable I think. Also we may need nesting, and we may very well need parallelism, and finally in the above, we would lose the ability to “share” nodes between trees in the background (mutliple clients might actually not need to recompute the same thing if they are all doing similar things).
That said, a certain measure of this pattern will almost certainy be used inside nodes, for certain stages. We don’t wish every single tiny line of code in an algo to be a separate stage. We want to break it down into logical boundaries of real-time composability, and also, into areas where node sharing and parallellism might be important.
as per my question, we may have very complex pipelines with numerous branches. They will always be acyclic, but they may well have many many child nodes per parent node in some cases.
Then you should probably post a more “real-world” example rather than the one in OP.
More broadly, perhaps I can offer the following motivation:
There are numerous GenStage-like technologies out there. Kafka streams, Apache Flink, Python Dask, most recently and interestingly, Apache BEAM backed by Google. All basically handle the back pressure issue that GenStage does, and often offer more.
All do DAG workflows. None do runtime dynamic workflows.
It seems to me clear that Elixir is ideally suited, given hot code loading, supervisor trees. multinode scaling, and cheap preemptive processes (unlike anybody else who are mainly cooperative and, when preemptive, relying on expensive Linux processes, so we can eliminate Golang or Akka or any jvm tech) to offer a massive comparative advantage on an otherwise crowded space, uniquely to offer runtime-mutable workflow trees.
Certainly my finance use case would find this to be an enormous advantage. I am almost certain that other domains would similarly find this to be a killer feature. And my intuition is that this maps much more easily to BEAM than to any other VM or runtime.
Thanks! This sounds a very interesting problem. I would still like to challenge you to think about this problem first functionally and then introduce processes. Most of the ability to modify the computations live can happen at the module/function level. For example, you mentioned hot code swapping, and that’s the level it operates. I believe if you focus first on the functional problem and then introduce the stages, the final solution may be simpler.
That’s also what Flow does. It abstract the computation from the stage placement so it can find the best process placement based on the operations you describe. In fact, Flow does its best to skip introduce stages, and does so only when necessary (for example because you need to partition data to be processed always by the same stage).
Functional incremental computations may be related to what you are trying to do. This video here gives a pretty clear conceptual overview of how to architect something in a functional language.
It’s something I would be interested in exploring. As @josevalim suggests, it appears that the building blocks are plain functional patterns. Once those are well defined, a process model can be layered on top.
In a way the computation graph is like a spreadsheet. You have some inputs that are run through a series of functional transformations to yield an output. The initial calculation is just calling the functions. Then if the input changes by a little bit (value is added, removed, etc), there are ways to incrementally update the new value without recalculating everything.
In the contrived example below. If your function is to sum over a large list of numbers, reduce has an interesting property where it can pick up where it left off. If [1, 2, 3, 4]
are the initial values and [5, 6, 7]
are added to the list, I can incrementally get the next value instead of starting from scratch and summing [1, 2, 3, 4, 5, 6, 7]
.
current_value = Enum.reduce([1, 2, 3, 4], fn (a, b) ->
a + b
end)
next_value = Enum.reduce([5, 6, 7], current_value, fn (a, b) ->
a + b
end)
reduce
looks to be a building block for the Enumerable
protocol for Stream and Enum, so I wonder if there is a way to lean on existing patterns to implement something like this.
The stackoverflow question still has the same simple example which would work perfectly well with the approach I outlined above.
As @venkatd noted, you can also use the same approach as in incrementally if you need to preserve a topological ordering, but that as well wouldn’t require you to use processes / genstage (just add an extra “order” field to every tuple and track the current order in the interpreter function).
If you are looking for a more expressive (than spark) dataflow processing framework, check out naiad / timely, it allows for
loops (structured loops for feedback in the dataflow)! But since you mentioned that there won’t be any cycles in your graphs, that might be unnecessary, it still has a lot of nice ideas in it. And judging by timely’s implementation, I’d only introduce processes to track progress like timely does with timestamps, so since timely does that only to support cyclic graph processing, I’d probably avoid using processes / genstage at all (for DAGs).
I agree with others that starting functional and then determining the stages later on would be a good approach. You might find out that it’s not worth running every compuational step in a separate process.
That said, based on your description I can see at least a theoretical possibility for dynamically (re)building the pipeline. I have zero experience with GenStage, but it looks like choosing the consumer at runtime might be possible with a custom dispatcher. Another option is to rebuild the pipeline (or some parts of it). Whenever the new pipeline configuration is received, you could stop the obsolete consumers, start the new ones, and subscribe them to the desired producers.
In the worst case, you could always roll your own implementation with GenServers. This would give you full runtime flexibility to achieve whatever you want. However, based on a quick glance at docs, it seems that GenStage provides enough configurability to make that possible.
Yes Yaron Minsky is an amazing guy. I have tried to figure out what he’s doing in the Incremental library, quickly. Probably I should take a deeper look.
I will probably be open sourcing the dynamic pipeline and when I have made some progress I will post a notice here.
Given this I would propose either a graph interpreter in Elixir or a graph2Elixir code transformer.
@josevalim @sasajuric @idi527 @tty @venkatd
Guys thank you for your valuable guidance on this. I think I will try something more “functional” first before seeing how to apply the GenStages, along the lines of @idi527 suggestions. I am thinking of open sourcing it so if I get somewhere worthwhile I will post here again.
Notwithstanding all the great advice, I’d still like to know how to cancel a GenStage subscription as I still seem to unable to get GenStage.cancel/3 to work. Would someone be able to show me an example based on the code my my Stack Overflow Question? How for example would I cancel the subscription from a1 to p? Or c to a1?
Please ignore my last message. I have (finally) figured out how to do it. Genstage.cancel({p, link1}, :whatever)