Hey all,
I’ve got a case where I’m using Broadway and have a batch of messages yielded in handle_batch
. I want to perform an action on each item in the batch, and I’d like to update the message state in the case of individual errors (e.g. if three messages run into parsing errors, I want to mark those three with Message.failed()
and ack the rest). This was not an action I could have performed in handle_message
because it was first necessary to format and then group these for a previous batch-wide action.
I can see a fairly simple solution here using Task.async_stream
resolved by Enum.map
-ing a function that checks :error
state and marks messages failed as necessary.
What I’m wondering is, am I missing an opportunity for something even lazier/more OTP-ish? Should the thing handling these messages be an explicit gen-server and should I have some kind of callback to dispatch all these messages to instances of the genserver and bundle up the responses? It feels like Task.async_stream
is doing this without the fuss, but sometimes when I use Task.async_stream
for these kinds of tasks I feel like I’m missing a trick.