I am currently building a Broadway pipeline using Broadway SQS (BroadwaySQS.Producer) and an SQS FIFO queue.
My SQS queue contains messages of:
- type_a
- type_b
- type_c
When the message is added to the queue, I am adding a custom message attribute “type” which is a string representing one of the above types.
Using a FIFO queue since it is important that messages are processed in order, despite their types.
All messages have the same Message Group Id
to ensure they are processed in order within my FIFO queue.
I have a handle_message
callback for each message “type”. I put messages into one of three batches (one batch per each type)
def handle_message(_, %{metadata: %{message_attributes: %{"type" => %{value: "type_a"}}}} = message, _) do
IO.puts("message of type_a")
message
|> Message.put_batcher(:type_a)
end
def handle_message(_, %{metadata: %{message_attributes: %{"type" => %{value: "type_b"}}}} = message, _) do
IO.puts("message of type_b")
message
|> Message.put_batcher(:type_b)
end
def handle_message(_, %{metadata: %{message_attributes: %{"type" => %{value: "type_c"}}}} = message, _) do
IO.puts("message of type_c")
message
|> Message.put_batcher(:type_c)
end
Messages are being received in an expected order. (Verified with puts statements)
i.e. If I enqueue
- message type_a
- message type_b
- message type_c
- message type_c
- message type_a
TheIO.puts
statement from each handle_message callback prints messages in this order in my console.
However, in this example, my handle_batch
callback is firing in the following order:
- handle_batch for :type_b
- handle_batch for :type_a
- handle_batch for :type_c
This creates a problem since the first message put in my FIFO queue (message type_a) is actually being processed after the second message (message type_b).
Is there a way that I can force my handle_batch callbacks fire in the order I would want, ensuring that message batches are processed based on the order of messages in my queue?
- handle_batch for :type_a
- handle_batch for :type_b
- handle_batch for :type_c
Perhaps this is not possible with Broadway, or I should be using a different pattern?
Do I need to create my own GenStage producer implementation?
Thanks!
Update:
I guess I could use one batcher, split up messages in the single handle_batch
callback, and process my different “type” messages in that same callback… Something like
def handle_message(_, message, _) do
message
|> Message.update_data(# update data)
end
def handle_batch(_, messages, _, _) do
messages_by_type = messages |> Enum.group_by(
fn m -> m.metadata.message_attributes["type"].value end,
fn m -> m.data end
)
if Map.has_key?(messages_by_type, "type_a") do
messages_by_type
|> Map.fetch!("type_a")
|> # process batch of type a
end
if Map.has_key?(messages_by_type, "type_b") do
messages_by_type
|> Map.fetch!("type_b")
|> # process batch of type b
end
if Map.has_key?(messages_by_type, "type_c") do
messages_by_type
|> Map.fetch!("type_c")
|> # process batch of type c
end
messages
end
The only problem here is that if processing of type_a messages succeeds, but processing of batch b throws an error, the messages of type_a will be marked as failed, even though they were successfully processed… There has to be a better solution