I’ve built a batch processor using GenStage, and now am wondering how I can isolate exceptions/crashes triggered by processing single items.
Let’s say I have this in a GenStage consumer (every 5 seconds it asks for 250 results):
def handle_events(results, _from, state) do
results
|> Task.async_stream(&process_result/1, max_concurrency: 50)
|> Enum.to_list
end
def process_result(result) do
# triggers many operations which may or may not raise exceptions
end
What I’m trying to avoid is having an exception/crash in process_result
bring down the GenServer process and/or affect processing of the rest of the batch.
What is an effective/idiomatic way of doing this (spawn, try/catch, trap exit?)