While exploring with Oban I was made aware of some of the risks involved with serialization, in particular with serializing anonymous functions (see Risks of serializing structs and captured function). The related/belated follow-up thought that occurred to me was “is it safe to send streams to separate nodes for execution?” Streams are essentially captured functions, right? So isn’t there a risk that if the stream is created on one node and then sent to another node for execution, then the stream might fail to execute because maybe the receiving node doesn’t have the same modules available or maybe it has been updated slightly causing the function identifier to change? Or does this issue only come up if the thing is serialized?
My specific use case is dealing with data processing – in a nutshell, one process prepares streams and sends them to another process where they are run with specific boundaries for concurrency and rate-limiting enforced. It seems to work fine when running on a single node, but I wanted a reality check… could this all fall apart if 2 nodes were involved or if a hot-code update were applied?
They can be, but they don’t need to be. Any Enumerable.t can be “a stream” and be used with Stream API. Given your other explanations I’d expect the streams you build to be a bunch of callbacks though.
I’m not sure you can trust this boundary - the captured functions aren’t IMO the biggest risk, it’s what the functions could reference that might not make the transition. For instance, things like ports or local PIDs won’t work.
When there are multiple nodes involved, is there a shared registry or something so that any node could look up a PID or a port?
Maybe related… would it be possible to write a function that could inspect a value to see whether or not it’s safe to ship across process boundaries? E.g. something that could detect ports/PIDs? (actually… maybe something as simple as doing JSON encoding would probably be a decent check on this). Or is this going against the grain?
My general advice for this sort of thing is to use inter-node communication to coordinate work, but not do the work, if that makes sense. Each node should do its own work on the data it fetches, and not try to recruit other other nodes to help with that within a given “unit” of work. If your overall processing pipeline has logical steps or chunks within it that produce artifacts then you can sometimes have a node work on stuff up and until a chunk is done, and then the next “step” could go in a queue or similar that gets run on another node. But within a given step it’s going to be both easier and generally faster to just do the work in one node.
Yes, there is a risk. If you send started but unfinished stream on the other node, this stream will start from the beginning.
There is no general rule, streams are unsafe even when they’re not distributed (when you exit the stream during traversal with something like throw which very common and even Elixir core team does this).
So I’d suggest using some other primitive like process. For example, you can create some proxy process with support for migration from node to node, which proxies GenStage producer
good question. The short answer for one use case is to adhere to DRY principles: e.g. wanting to reuse GenStage components that do the processing, especially when the processing needs to happen with shared rate-limiting/concurrency limits. E.g. stream X does one set of API operations, stream Y does another set of operations against the same API, and they get processed in the same GenStage consumer because all operations share the same usage limit defined by that API.
It sounds like if the execution flow is expected to be distributed across multiple nodes, then perhaps it’s better to send messages with “simple” payloads, e.g. a simple list instead of a stream.