Asynchronously handling batches of messages in Broadway

Hey all,

I’ve got a case where I’m using Broadway and have a batch of messages yielded in handle_batch. I want to perform an action on each item in the batch, and I’d like to update the message state in the case of individual errors (e.g. if three messages run into parsing errors, I want to mark those three with Message.failed() and ack the rest). This was not an action I could have performed in handle_message because it was first necessary to format and then group these for a previous batch-wide action.

I can see a fairly simple solution here using Task.async_stream resolved by Enum.map-ing a function that checks :error state and marks messages failed as necessary.

What I’m wondering is, am I missing an opportunity for something even lazier/more OTP-ish? Should the thing handling these messages be an explicit gen-server and should I have some kind of callback to dispatch all these messages to instances of the genserver and bundle up the responses? It feels like Task.async_stream is doing this without the fuss, but sometimes when I use Task.async_stream for these kinds of tasks I feel like I’m missing a trick.

Task.async_stream is to me the perfect middle ground between the ability to multiplex work on all CPU cores and having a very simple code doing it. In my eyes you aren’t missing anything.

But have a look at Flow for something in-between Task.async_stream and GenStage / Broadway.

1 Like