It fully processes the data, but when Zipstage sends { :stop, :shutdown, _ } and terminates, the Flow pipeline (Flow.Coordinator) doesn’t shut down/terminate. I’ve tried, probably erroneously, to add cancel: :permanent with no effect.
The problem is in Loadstage (the into_spec) pertaining to some very specific process and gen_stage semantics related to subscription and termination. The answer is not at all apparent in the module documentation for either flow or gen_stage. I did find what I needed by examining the flow module Flow.MapReducer in the source code.
The primary points being:
In init() call Process.flag(:trap_exit, true) to inhibit unmanaged exits see erlang manual
implement handle_subscribe to track producers
implement handle_cancel and when the producer list drops to 0, call GenStage.async_info( self(), :stop )
implement a handle_info( :stop... to generate a { :stop, :normal, _ } tuple
But these points have subtleties, so it is important to read Flow.MapReducer to account for them. It is a very simple module compared to the rest of Flow and provides a solid template.
You may ask, why not just use Flow.reduce: I needed a stage that would receive the demand unit of events as a whole and I didn’t see another way to get them.