Question about processing batches of messages using Broadway and SQS FIFO queue

I am currently building a Broadway pipeline using Broadway SQS (BroadwaySQS.Producer) and an SQS FIFO queue.

My SQS queue contains messages of:

  • type_a
  • type_b
  • type_c

When the message is added to the queue, I am adding a custom message attribute “type” which is a string representing one of the above types.
Using a FIFO queue since it is important that messages are processed in order, despite their types.
All messages have the same Message Group Id to ensure they are processed in order within my FIFO queue.

I have a handle_message callback for each message “type”. I put messages into one of three batches (one batch per each type)

  def handle_message(_, %{metadata: %{message_attributes: %{"type" => %{value: "type_a"}}}} = message, _) do
    IO.puts("message of type_a")
    message
    |> Message.put_batcher(:type_a)
  end
  
  def handle_message(_, %{metadata: %{message_attributes: %{"type" => %{value: "type_b"}}}} = message, _) do
    IO.puts("message of type_b")
    message
    |> Message.put_batcher(:type_b)
  end

  def handle_message(_, %{metadata: %{message_attributes: %{"type" => %{value: "type_c"}}}} = message, _) do
    IO.puts("message of type_c")
    message
    |> Message.put_batcher(:type_c)
  end

Messages are being received in an expected order. (Verified with puts statements)

i.e. If I enqueue

  1. message type_a
  2. message type_b
  3. message type_c
  4. message type_c
  5. message type_a
    The IO.puts statement from each handle_message callback prints messages in this order in my console.

However, in this example, my handle_batch callback is firing in the following order:

  1. handle_batch for :type_b
  2. handle_batch for :type_a
  3. handle_batch for :type_c

This creates a problem since the first message put in my FIFO queue (message type_a) is actually being processed after the second message (message type_b).

Is there a way that I can force my handle_batch callbacks fire in the order I would want, ensuring that message batches are processed based on the order of messages in my queue?

  1. handle_batch for :type_a
  2. handle_batch for :type_b
  3. handle_batch for :type_c

Perhaps this is not possible with Broadway, or I should be using a different pattern?
Do I need to create my own GenStage producer implementation?

Thanks!

Update:
I guess I could use one batcher, split up messages in the single handle_batch callback, and process my different “type” messages in that same callback… Something like

  def handle_message(_, message, _) do
    message
    |> Message.update_data(# update data)
  end

  def handle_batch(_, messages, _, _) do
    messages_by_type = messages |> Enum.group_by(
      fn m -> m.metadata.message_attributes["type"].value end,
      fn m -> m.data end
    )

   if Map.has_key?(messages_by_type, "type_a") do
    messages_by_type
    |> Map.fetch!("type_a")
    |> # process batch of type a
  end

   if Map.has_key?(messages_by_type, "type_b") do
    messages_by_type
    |> Map.fetch!("type_b")
    |> # process batch of type b
  end

   if Map.has_key?(messages_by_type, "type_c") do
    messages_by_type
    |> Map.fetch!("type_c")
    |> # process batch of type c
  end
  messages
end

The only problem here is that if processing of type_a messages succeeds, but processing of batch b throws an error, the messages of type_a will be marked as failed, even though they were successfully processed… There has to be a better solution

Hi @scottyscripts!

Broadway will only be able to guarantee message ordering after we add producer-based partitioning/hashing. We’re already working on this issue since it’s a requirement for the Kafka connector.

2 Likes

Thanks for that! Totally makes sense. In the meantime… would you recommend using something like my pattern above, sorting on message metadata in a single batch using broadway? (with the obvious pitfall that messages may be marked as failed even though they are processed?) It gets the job done for our use case, but unsure if there are unintended consequences here.

Or do you think I should go with a custom GenServer implementation to poll and process off the queue in order?

Or something else?

Thank you!

The problem is that this pattern cannot guarantee order either unless you have just one stage for each layer (i.e. 1 producer, 1 processor and so on). Otherwise, the batcher can still receive chunks of messages that, even after sorting, will have gaps in the sequence due to messages that took longer to process and will arrive in a later chunk. I believe you’ll be able to see this more clearly if you start consuming a lot of data. So, in short, if you want to use Broadway and keep the order of messages you have to:

  1. Define one stage throughout the whole pipeline (no parallelism)
  2. If you need parallelism, you’ll have to start multiple pipelines

that will be until we have partitioning built-in.

1 Like

Kind of an old question at this point, but noticed the PR for producer based partitioning https://github.com/plataformatec/broadway/issues/62 has been merged to master.

Do you guys have any idea on when there will be a new release / will it be part of 5.0?

Thanks!

Support for partition_by in processors and batchers has been added to v0.5.0.

1 Like