I’ve got a case where I’m using Broadway and have a batch of messages yielded in
handle_batch. I want to perform an action on each item in the batch, and I’d like to update the message state in the case of individual errors (e.g. if three messages run into parsing errors, I want to mark those three with
Message.failed() and ack the rest). This was not an action I could have performed in
handle_message because it was first necessary to format and then group these for a previous batch-wide action.
I can see a fairly simple solution here using
Task.async_stream resolved by
Enum.map-ing a function that checks
:error state and marks messages failed as necessary.
What I’m wondering is, am I missing an opportunity for something even lazier/more OTP-ish? Should the thing handling these messages be an explicit gen-server and should I have some kind of callback to dispatch all these messages to instances of the genserver and bundle up the responses? It feels like
Task.async_stream is doing this without the fuss, but sometimes when I use
Task.async_stream for these kinds of tasks I feel like I’m missing a trick.