I’ve had to dig much deeper on this, and I’ve found a couple things… first is that the documentation appears to be wrong (but someone should check me on that before I submit a PR).
One thing to clarify is that the Broadway.test_messages/2
is pretty simplistic: the list of arguments that you provide it as the 2nd argument gets mapped onto %Broadway.Message{}
structs as the data
field. In our case, that was not sufficient to test our pipeline because our message handlers rely on pattern matching of the message metadata
field as well.
A useful modification to my start_link/1
function allows me to use the Broadway.DummyProducer
in my tests:
defmodule Something do
use Broadway
alias Broadway.Message
def start_link(opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {
Keyword.get(opts, :producer, BroadwaySQS.Producer),
queue_url: Application.get_env(:my_app, :sqs_queue_url),
message_attribute_names: ["type", "foo"],
config: Application.fetch_env!(:my_app, :sqs_config)
}
],
# ... batchers, etc...
)
end
The area in particular that does not work as advertised is the assert_receive
. Instead of returning a list of all successful messages in one go, the batch_mode: :flush
option seems to trigger each message to eject as soon as it’s done processing. So instead of a single assert_receive
checking for a list with 3 elements, I had to do 3 assert_receive
statements, each checking for a list with 1 element. Here’s how it looked (with my custom modifications to mimic what was in test_messages/2
:
# I set up a fixture that supplied fully formed `%Broadway.Message{}` structs
test "pipeline is drained of test messages", %{msg1: msg1, msg2: msg2, msg3: msg3} do
{:ok, _pid} = Something.start_link(
producer: Broadway.DummyProducer,
# ...
)
# We cannot make use of `Broadway.test_messages/2` because the list of values you provide it as the 2nd argument
# maps to each message's `data` field, and that's all it does in the way of creating test messages.
# Our messages are more complex: our messages must have metadata, so we come up with our own modification of the
# `Broadway.test_messages/2` function:
batch_mode = :flush
ref = make_ref()
ack = {Broadway.CallerAcknowledger, {self(), ref}, :ok}
msg1 = %Message{msg1 | acknowledger: ack, batch_mode: batch_mode}
msg2 = %Message{msg2 | acknowledger: ack, batch_mode: batch_mode}
msg3 = %Message{msg3 | acknowledger: ack, batch_mode: batch_mode}
messages = [msg1, msg2, msg3]
:ok = Broadway.push_messages(Something, messages)
# Assert that the messages have been consumed
assert_receive {:ack, ^ref, [_] = _successful, _failed}, 100
assert_receive {:ack, ^ref, [_] = _successful, _failed}, 100
assert_receive {:ack, ^ref, [_] = _successful, _failed}, 100
end
Note that a 4th assert_receive
statement would fail because the mailbox would be empty.
I tried doing this using batch_mode: :bulk
but that also seems to have returned 1 message per receive.