Hello,
I want to connect my flow to a few different downstream flows. However, I’m unable to instantiate the downstream flows because all Flow constructors require that you have knowledge of the upstream source (which must be either materialized Flow.from_stages
or written as a GenStage module Flow.from_specs
).
If there was a way to construct a non-materialized flow (something like Flow.new
), then I could write:
file_parser = File.stream!(file) |> Flow.from_enumerable() # this is my existing flow
database_writer = Flow.new() |> Flow.filter(..) |> Flow.map(..) |> Flow.partition() |> Flow.reduce(..)
data_summarizer = Flow.new() |> Flow.flat_map(..) |> Flow.partition() |> Flow.reduce(..)
stats_collector = Flow.new() |> Flow.emit_and_reduce(..) |> Flow.on_trigger(..)
overall = file_parser |> Flow.through_flows(database_writer, data_summarizer, stats_collector)
In the overall flow, events coming out of upstream flow are copied into each of the downstream flows. Eventually, I want to further connect the downstream flows together into a more complex graph like this:
file_parser = File.stream!(file) |> Flow.from_enumerable() # this is my existing flow
database_writer = Flow.new() |> Flow.filter(..) |> Flow.map(..) |> Flow.partition() |> Flow.reduce(..)
data_summarizer = Flow.new() |> Flow.flat_map(..) |> Flow.partition() |> Flow.reduce(..)
stats_collector = Flow.new() |> Flow.emit_and_reduce(..) |> Flow.on_trigger(..)
data_summarizer = data_summarizer |> Flow.into_flows(database_writer)
stats_collector = stats_collector |> Flow.into_flows(database_writer)
overall = file_parser |> Flow.through_flows(database_writer, data_summarizer, stats_collector)
Is this currently possible? If not, do I just need to create GenStage modules to implement this?
Thanks for your consideration.
P.S. I also submitted a feature request for this at https://github.com/plataformatec/flow/issues/72