Flow into/from GenStage

I’m currently playing around with Flow and GenStage and wanted to know if something like this is possible:

  def start_link(file) do
    File.stream!(file)
    |> Flow.from_enumerable()
    |> Flow.partition()
    |> Flow.into_stages([ProducerConsumer])
    |> elem(1)
    |> Flow.from_stage()
    # more Flow stuff like |> Flow.filter
    |> Flow.into_stages([Consumer])
  end

This code does NOT work of course. My idea behind this was to have a clean and easy to understand flow and a bunch of GenStage modules which do the hard and complicated stuff.

Something like this could work, but the devil—as usually—is in the details.

First of all, there are some issues with the flow above, but I assume that’s because you didn’t want to include too much detail. For example, I don’t think calling Flow.into_stages/2 immediately after Flow.partition/1 makes sense.

Also consider if using Flow is really necessary. You’re already using GenStage for other tasks. While Flow is a simpler, higher level way of coordinating multiple stages of consumers, you might find that it’s unnecessary overhead if you don’t need that kind of coordination.

I assume you want to supervise this, so you’ll need to split it into separate modules too. So instead of

  |> Flow.into_stages([ProducerConsumer])
  |> elem(1)
  |> Flow.from_stage()

you would have a supervisor with the following children:

  children = [
    ProducerConsumer,
    Consumer,
    {MyFlow1, {file, ProducerConsumer}},
    {MyFlow2, {ProducerConsumer, Consumer}}
  ]

Then MyFlow1 and MyFlow2 could be modules that satisfy a behaviour that implements child_spec/1 and start_link/1 so that their main job would be to implement the flow specific aspects.

We have a similar kind of setup and so far it has worked out well. Let me know if you need more help with the specifics of the behaviour and the flow modules.

2 Likes

I would also be interested in Flow getting better first class support for flows/graphs that incorporate external GenStages (or maybe just GenStage MFAs if control over materialization is necessary).

These days I end up manually composing the few custom GenStages that I need, plus a custom Enum GenStage that is a passthrough :producer_consumer + side effects.

The system works, but feels backwards. I’d much rather use a high level Flow/Graph DSL. And would welcome any enhancements to Flow that would make it easier to BYO GenStage in a complete execution graph (not just as flow source/sink).

Note: Some may argue that the need for custom GenStages ought to be rare. Maybe. But the need is still there. There are times you want/need an async boundary. There are times you want/need to deal with differences in flow rates. There are times you want/need to manage state and would rather do it in a GenServery way instead of a functional reducer way.

I’d love for Flow to incorporate more support for these types of data flows. The GenStage implementation of these things tends to then influence the DSL. More mature flow DSLs support the notion of explicit async boundaries, sub-flows, explicit buffers, timers and other rate control mechanisms, among other things. All of these can be implemented via a graph of GenStages. Optimizations can then be made to avoid extra stages and communication when unnecessary. For example, a series of Flow.map transformations could be made as a series of :producer_consumer GenStages, but usually it is more efficient to compose all those functions together. Flow makes that optimization already today.

1 Like

I got the following to run, however @lackac could you be so kind to give me your opinion on it? (Still a GenStage/Flow novice here :raising_hand_man: )

def start_link(file) do
  File.stream!(file)
  |> Flow.from_enumerable()
  # |> Flow.filter() ...
  |> Flow.into_stages([ProducerConsumer])
  
  ProducerConsumer
  |> Flow.from_stage()
  # more Flow stuff like |> Flow.filter
  |> Flow.into_stages([Consumer])
end

Yes, this should work. However, I am curious to know why you need to drop Flow in favor of GenStage?

2 Likes

If your ProducerConsumer is wrapping a Port, you’d need a GenStage. Or you want to write a GenStage that emits other GenStages (stream of streams). I do this for TCP connection processing within GenStage graphs. Or you need to support differences in rate (extrapolate, expand, conflate). Or explicit buffers. Plenty of reasons to need GenStages as part of your Flow.

@ream88 Does it right by joining the flows via the GenStage, but this pattern isn’t obvious. When I was just starting I thought it would be a good idea to simply pipe everything together to form a larger composite flow.

Source
|> Flow.from_stage()
#|> Flow.map() ...
|> Flow.into_stages([ProducerConsumer])
|> Flow.from_stage()
|> Flow.into_stages([ProducerConsumer2])
|> Flow.from_stage()
|> Flow.into_stages([Sink])

In fact, I don’t think this even worked liked I hoped because I had to unpackage the pid from the Flow.into_stages return value.

While the pipe operator makes this look like a single flow with async components, it doesn’t behave that way. Demand doesn’t originate from the Sink and then propagate to the Source. Instead, multiple independent flows are materialized, activated, and then stitched together.

The problem with this is that internal subscription switching isn’t immediate, so there’s a good chance that GenStages will process some messages that aren’t ever visible to the rest of the flow because it subscribed late. There may be other related gotchas wrt flow behavior in the face of crashes and recovery.

In short, attempting to mix GenStages into a Flow to make a larger Flow just didn’t work like I’d hoped. I’d like Flow to be more like the Akka Streams DSL. Both in behavior and richness.

My idea was to reuse one of my GenStage producer-consumer inside another flow.

I think the Flow DSL could support GenStage chaining via a via function that would help enable a single larger flow materialization.

Hypothetically:

Flow.source([enumerable])
|> Flow.via(Module1) # term or {MFA}, etc, not pid
|> Flow.map()
|> Flow.via(Module2)
|> Flow.filter()
|> Flow.sink(Module3)
# |> Flow.run() # or Flow.run_into(Module3)

But since sources and sinks are just a matter per perspective and flows are just descriptions/blueprints until they’re materialized. You could construct flows, copy the blueprints around, and put them together or reuse them later.

def businessWorkflowC() do
businessWorkflowA()
|> Flow.filter()
|> Flow.via(businessWorkflowB()) # returns a partial linear blueprint
end
...
|> Flow.via(businessWorkflowC())
|> Flow.filter()
|> Flow.sink(collectable) # materialize here

Hopefully this is clear enough. Hard to explain without types the notion of a Flow definition, partial FlowGraphs, Source shapes, Sink shapes, etc.

I agree. Can you please open up an issue? I would call it Flow.into_producer_consumer/2 or similar. Should they already be started or should we start them as part of the flow?

1 Like

I think you’ll want Flow.into_producer_consumer/2 to support both flow starting and external started operations.

I think the most common scenario will be flow started. You’d want the resources used by the flow to be managed by the flow.

But you’ll also want to support the case where this GenStage is shared across flows for whatever reason. In which case, you’ll need to accept an already started pid.

You need both.

The Flow started case could also be used to abuse the GenStage behavior and allow Flow to simply compose callbacks in the calling process, instead of requiring an async boundary.

This would allow developers to treat this as a component model without necessarily inheriting the async overhead. To really flesh this out you’d need to add further async() demarcations to the Flow DSL. But more things are possible if you aren’t a process already.

I think both start_producer_consumer and into_producer_consumer will be welcome then. We can probably rename from_stages to from_producers and support start_producers as well as into_consumers and start_consumers. What do you think? Can you open up an issue with this summary of this discussion?

1 Like

I think I’d try and keep the function name the same and have the behavior be polymorphic on the 2nd parameter type.

I could see value in passing in:

  • some tuple (we’d have to think about the format but I think GenServers and Registries may already have a useful convention)
  • a pid
  • a Flow

I can file the issue. Do you want me to file now and continue the discussion on GitHub, or finish up the conversation here and then file the issue with the digest from the thread?

The issue is that the name in itself can be an atom (then we already conflict between module name or process_name), a tuple, etc. Plus semantically they are all very different. One start the processes under the same tree, the other does not. So I think they deserve different names. Let’s tidy this up on github though.

1 Like

Feel free to tweak title. Naming things is hard.

3 Likes

I never thought I would be involved in shaping the future of flow (and/or Elixir I guess :wink:). Thx everybody here for the help, and @josevalim for this awesome language! I’m closing this now!

Looks like I’m coming back late to the discussion, but it’s great to see these ideas. start_consumers is something we will be able to use and will simplify our current implementation as well. I also liked the changes in 0.14. Thanks, @josevalim and @CptnKirk for hashing this out.

1 Like

This feature is now in master: https://github.com/elixir-lang/flow/issues/52

6 Likes

Flow 0.14 incorporates the GenStage integration enhancements requested in this thread: http://blog.plataformatec.com.br/2018/07/whats-new-in-flow-v0-14/?utm_campaign=elixir_radar_151&utm_medium=email&utm_source=RD+Station

Thanks @josevalim