I have a list of events coming from an event bus let’s assume the form like this:
%ProcucerEvents{
  id: 1, 
  events: [
    %Event{id: 1, payload: %{#something}, 
    %Event{id: 2, payload: %{#something}, 
    %Event{id: 3, payload: %{#something}, 
    %Event{id: 4, payload: %{#something}, 
    %Event{id: 5, payload: %{#something}
  ]
}
And I need to process each event separately through a pipeline of actions might look like this:
@pipeline [Decode, Perform, Encode, Send]
def handle_event(%ProcucerEvents{events: events}, pipeline \\ @pipeline) do
    events
    |> Task.async_stream(&process_event(&1, pipeline))
    |> Enum.to_list()
end
def process_event(event, pipeline) do
  Enum.reduce(pipeline, event, fn action, acc -> apply(action, :call, acc) end)
end
My question here is how to take control of the Task.async_stream returned list so I can push a fail reason to the response, or at least to be able to catch failure of one of the pipeline steps. so for example if Decode step return {:error, :something_happened} the Task_stream will still return a successful tuple [ok: :something_happened, ok: ...]
I can reduce the returned tuple to filter only successful passes based on the returned key. But I’m asking if there’s a clean way to do it. Tried to raise an exit_trap flag but I couldn’t figure out a way to have it.
I’ll appreciate any suggestion please.




















