I have a list of events coming from an event bus let’s assume the form like this:
%ProcucerEvents{
id: 1,
events: [
%Event{id: 1, payload: %{#something},
%Event{id: 2, payload: %{#something},
%Event{id: 3, payload: %{#something},
%Event{id: 4, payload: %{#something},
%Event{id: 5, payload: %{#something}
]
}
And I need to process each event separately through a pipeline of actions might look like this:
@pipeline [Decode, Perform, Encode, Send]
def handle_event(%ProcucerEvents{events: events}, pipeline \\ @pipeline) do
events
|> Task.async_stream(&process_event(&1, pipeline))
|> Enum.to_list()
end
def process_event(event, pipeline) do
Enum.reduce(pipeline, event, fn action, acc -> apply(action, :call, acc) end)
end
My question here is how to take control of the Task.async_stream
returned list so I can push a fail reason to the response, or at least to be able to catch failure of one of the pipeline steps. so for example if Decode
step return {:error, :something_happened}
the Task_stream
will still return a successful tuple [ok: :something_happened, ok: ...]
I can reduce the returned tuple to filter only successful passes based on the returned key. But I’m asking if there’s a clean way to do it. Tried to raise an exit_trap
flag but I couldn’t figure out a way to have it.
I’ll appreciate any suggestion please.