How to create a Flow from a non-materialized source?

Hello,

I want to connect my flow to a few different downstream flows. However, I’m unable to instantiate the downstream flows because all Flow constructors require that you have knowledge of the upstream source (which must be either materialized Flow.from_stages or written as a GenStage module Flow.from_specs).

If there was a way to construct a non-materialized flow (something like Flow.new), then I could write:

file_parser = File.stream!(file) |> Flow.from_enumerable() # this is my existing flow

database_writer = Flow.new() |> Flow.filter(..) |> Flow.map(..) |> Flow.partition() |> Flow.reduce(..)
data_summarizer = Flow.new() |> Flow.flat_map(..) |> Flow.partition() |> Flow.reduce(..)
stats_collector = Flow.new() |> Flow.emit_and_reduce(..) |> Flow.on_trigger(..)

overall = file_parser |> Flow.through_flows(database_writer, data_summarizer, stats_collector)

In the overall flow, events coming out of upstream flow are copied into each of the downstream flows. Eventually, I want to further connect the downstream flows together into a more complex graph like this:

file_parser = File.stream!(file) |> Flow.from_enumerable() # this is my existing flow

database_writer = Flow.new() |> Flow.filter(..) |> Flow.map(..) |> Flow.partition() |> Flow.reduce(..)
data_summarizer = Flow.new() |> Flow.flat_map(..) |> Flow.partition() |> Flow.reduce(..)
stats_collector = Flow.new() |> Flow.emit_and_reduce(..) |> Flow.on_trigger(..)

data_summarizer = data_summarizer |> Flow.into_flows(database_writer)
stats_collector = stats_collector |> Flow.into_flows(database_writer)

overall = file_parser |> Flow.through_flows(database_writer, data_summarizer, stats_collector)

Is this currently possible? If not, do I just need to create GenStage modules to implement this?

Thanks for your consideration.

P.S. I also submitted a feature request for this at https://github.com/plataformatec/flow/issues/72

@josevalim has explained why this isn’t supported and has also suggested that I reorganize my flow to be unidirectional. :heart_eyes: In essence, I would have to perform two steps in each of my “downstream flows”:

  1. pass the original input through to the next “downstream flow”
  2. do the actual work for the “downstream flow” and emit results
file_parser() |> data_summarize() |> stats_collectors() |> database_writer()

This way, all of the original output events from the file_parser make it all the way through the entire chain. Similarly, additional events generated by each of my “downstream flows” also make it all the way through. Eventually, everything ends up being written to the database by the database_writer “flow”. :ok_hand: Fantastic!

2 Likes

Hey :grin:

I think I’m trying to do something similar to what you did (see my recent question). Each of your downstream flows transforms things, so how can you pass things through? You mention “pass the original input through to the next ‘downstream flow’”, so what does that look like with your original example?

Also, if you look at my example I’m using into_specs to create supervised children of the original flow process, which seems like it should do exactly what I (we? :grin:) want, but I don’t know how to make a flow which doesn’t start with a Producer (except by using from_stages, I guess, but that doesn’t seem to be working…)