Example of testing Broadway

I just started looking over a Broadway repository and I’m wondering how to take advantage of Broadway.DummyProducer and/or test_messages. I can’t seem to make those run…

I tried to adapt the examples from the docs:

defmodule MyAppTest do
  use ExUnit.Case

  test "example test" do
    ref = Broadway.test_messages(MyApp, [1, 2, 3])
    assert_receive {:ack, ^ref, [_, _, _] = _successful, _failed}, 1000
  end
end

but that always generates an error:

     test/my_app_test.exs:5
     ** (exit) exited in: GenServer.call(MyApp, :producer_names, 5000)
         ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
     code: ref = Broadway.test_messages(BroadwaySQS.Producer, [1, 2, 3])

I’ve tried using various other modules as input to the Broadway.test_messages function, but can’t seem to get past the error. I’m sure I’m missing something silly, but what is a valid argument to that function?

1 Like

Have you started your brodway? And how do you do so?

1 Like

Ah… digging around a bit it’s making a bit more sense.

This is my application.ex:

defmodule MyApp.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  def start(_type, _args) do
    children = []

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

In this case, the various Broadway processes are NOT started with the rest of the app. Instead, they get started in dedicated .exs scripts using something like

alias MyApp.Something

require Logger

# Parse arguments to get a duration value, but for simplicity:
duration = 30_000
{:ok, pid} = Something.start_link([])
Process.sleep(duration)

GenServer.stop(pid)
|> Logger.info()

If I add the Something.start_link([]) to my tests, then the test methods seem to work… at least it’s getting new error messages…

1 Like

I’ve had to dig much deeper on this, and I’ve found a couple things… first is that the documentation appears to be wrong (but someone should check me on that before I submit a PR).

One thing to clarify is that the Broadway.test_messages/2 is pretty simplistic: the list of arguments that you provide it as the 2nd argument gets mapped onto %Broadway.Message{} structs as the data field. In our case, that was not sufficient to test our pipeline because our message handlers rely on pattern matching of the message metadata field as well.

A useful modification to my start_link/1 function allows me to use the Broadway.DummyProducer in my tests:

defmodule Something do
  use Broadway
  alias Broadway.Message
  
  def start_link(opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {
          Keyword.get(opts, :producer, BroadwaySQS.Producer),
          queue_url: Application.get_env(:my_app, :sqs_queue_url),
          message_attribute_names: ["type", "foo"],
          config: Application.fetch_env!(:my_app, :sqs_config)
        }
      ],
      # ... batchers, etc...
    )
  end

The area in particular that does not work as advertised is the assert_receive. Instead of returning a list of all successful messages in one go, the batch_mode: :flush option seems to trigger each message to eject as soon as it’s done processing. So instead of a single assert_receive checking for a list with 3 elements, I had to do 3 assert_receive statements, each checking for a list with 1 element. Here’s how it looked (with my custom modifications to mimic what was in test_messages/2:

# I set up a fixture that supplied fully formed `%Broadway.Message{}` structs
test "pipeline is drained of test messages", %{msg1: msg1, msg2: msg2, msg3: msg3} do
      {:ok, _pid} = Something.start_link(
        producer: Broadway.DummyProducer,
      # ...
      )
      # We cannot make use of `Broadway.test_messages/2` because the list of values you provide it as the 2nd argument
      # maps to each message's `data` field, and that's all it does in the way of creating test messages.
      # Our messages are more complex: our messages must have metadata, so we come up with our own modification of the
      # `Broadway.test_messages/2` function:
      batch_mode = :flush
      ref = make_ref()
      ack = {Broadway.CallerAcknowledger, {self(), ref}, :ok}

      msg1 = %Message{msg1 | acknowledger: ack, batch_mode: batch_mode}
      msg2 = %Message{msg2 | acknowledger: ack, batch_mode: batch_mode}
      msg3 = %Message{msg3 | acknowledger: ack, batch_mode: batch_mode}

      messages = [msg1, msg2, msg3]

      :ok = Broadway.push_messages(Something, messages)

        # Assert that the messages have been consumed
        assert_receive {:ack, ^ref, [_] = _successful, _failed}, 100
        assert_receive {:ack, ^ref, [_] = _successful, _failed}, 100
        assert_receive {:ack, ^ref, [_] = _successful, _failed}, 100
    end

Note that a 4th assert_receive statement would fail because the mailbox would be empty.

I tried doing this using batch_mode: :bulk but that also seems to have returned 1 message per receive.

1 Like

I filed this issue: https://github.com/dashbitco/broadway/issues/165