Flow (0.15.0) does not terminate

I have what seems to be a simple Flow pipeline:

    Flow.from_specs( [ { Zipstage,
                         [ dat_file, zip_file ]
                       } ],
      cancel: :permanent
    )
    |> Flow.map( &from_line/1 )
    |> Flow.filter( fn nil -> false; _ -> true end )
    |> Flow.into_specs( [
      { Loadstage, [] },
      { Loadstage, [] },
      { Loadstage, [] },
    ], cancel: :permanent )

It fully processes the data, but when Zipstage sends { :stop, :shutdown, _ } and terminates, the Flow pipeline (Flow.Coordinator) doesn’t shut down/terminate. I’ve tried, probably erroneously, to add cancel: :permanent with no effect.

What am I missing?

Thanks!
Dan

The problem is in Loadstage (the into_spec) pertaining to some very specific process and gen_stage semantics related to subscription and termination. The answer is not at all apparent in the module documentation for either flow or gen_stage. I did find what I needed by examining the flow module Flow.MapReducer in the source code.

The primary points being:

  1. In init() call Process.flag(:trap_exit, true) to inhibit unmanaged exits see erlang manual
  2. implement handle_subscribe to track producers
  3. implement handle_cancel and when the producer list drops to 0, call GenStage.async_info( self(), :stop )
  4. implement a handle_info( :stop... to generate a { :stop, :normal, _ } tuple

But these points have subtleties, so it is important to read Flow.MapReducer to account for them. It is a very simple module compared to the rest of Flow and provides a solid template.

See https://github.com/dashbitco/flow/blob/v0.15.0/lib/flow/map_reducer.ex

You may ask, why not just use Flow.reduce: I needed a stage that would receive the demand unit of events as a whole and I didn’t see another way to get them.