I’ve built a batch processor using GenStage, and now am wondering how I can isolate exceptions/crashes triggered by processing single items.
Let’s say I have this in a GenStage consumer (every 5 seconds it asks for 250 results):
def handle_events(results, _from, state) do results |> Task.async_stream(&process_result/1, max_concurrency: 50) |> Enum.to_list end def process_result(result) do # triggers many operations which may or may not raise exceptions end
What I’m trying to avoid is having an exception/crash in
process_result bring down the GenServer process and/or affect processing of the rest of the batch.
What is an effective/idiomatic way of doing this (spawn, try/catch, trap exit?)