Actually, yes, abstracting this higher is possible and really straightforward:
defmodule TaggedFlow do
def map(flow, tag, transform) do
Flow.map(flow, fn tagged_data ->
case tagged_data do
{^tag, data} -> transform.(data)
_ -> tagged_data
end
end)
end
# And each/3, filter/3, flat_map/3, reduce etc.
end
Then we can go:
Flow.from_enumerable(A.data)
|> Flow.map(&B.do_stuff/1)
|> TaggedFlow.map(:ok, &C.do_stuff/1)
|> TaggedFlow.each(:ok, &D.do_stuff/1)
|> TaggedFlow.each(:error, &E.do_stuff/1)
|> TaggedFlow.each(:error, &F.do_stuff/1)
|> Flow.run
Then there’s no modifications for passthrough or tag pattern matching needed on the actual work modules.