Take a control of Task.async_stream return

I have a list of events coming from an event bus let’s assume the form like this:

%ProcucerEvents{
  id: 1, 
  events: [
    %Event{id: 1, payload: %{#something}, 
    %Event{id: 2, payload: %{#something}, 
    %Event{id: 3, payload: %{#something}, 
    %Event{id: 4, payload: %{#something}, 
    %Event{id: 5, payload: %{#something}
  ]
}

And I need to process each event separately through a pipeline of actions might look like this:

@pipeline [Decode, Perform, Encode, Send]

def handle_event(%ProcucerEvents{events: events}, pipeline \\ @pipeline) do
    events
    |> Task.async_stream(&process_event(&1, pipeline))
    |> Enum.to_list()
end

def process_event(event, pipeline) do
  Enum.reduce(pipeline, event, fn action, acc -> apply(action, :call, acc) end)
end

My question here is how to take control of the Task.async_stream returned list so I can push a fail reason to the response, or at least to be able to catch failure of one of the pipeline steps. so for example if Decode step return {:error, :something_happened} the Task_stream will still return a successful tuple [ok: :something_happened, ok: ...]
I can reduce the returned tuple to filter only successful passes based on the returned key. But I’m asking if there’s a clean way to do it. Tried to raise an exit_trap flag but I couldn’t figure out a way to have it.
I’ll appreciate any suggestion please.

IMO the place to handle this is in your reducer. Each step of your pipeline should return {:ok, acc} or {:error, reason}.

def process_event(event, pipeline) do
  Enum.reduce_while(pipeline, event, fn action, acc ->
    case apply(action, :call, acc) do
      # happy case, continue processing
      {:ok, acc} ->
        {:cont, acc}

      # sad case, halt processing
      {:error, reason} ->
        {:halt, {:error, reason}}
    end
  end)
end

This doesn’t necessarily change the result of your async stream – that :ok is indicating that the async operation completed without crashing.

1 Like

Reading into your example, are you sure that all the steps in the pipeline should be done in parallel? To me it doesn’t sound like you’d want to do sending before encoding.

My point is, it doesn’t look like Task.async_stream is the right tool for the job.

2 Likes

Sorry, @dimitarvp, may be It wasn’t clear. but I meant that event should be processed in parallel. but the actions in the pipeline will still be sequential.

@zachallaun Thank you for your response, That’s exactly what I was doing except the example in the initial topic was simplified.

Ah, sorry, I misread the code (the function capture got me). My bad.

1 Like

Doesn’t it return {:ok, {:error, :something_happened}} though?