fireproofsocks

fireproofsocks

Example of testing Broadway

I just started looking over a Broadway repository and I’m wondering how to take advantage of Broadway.DummyProducer and/or test_messages. I can’t seem to make those run…

I tried to adapt the examples from the docs:

defmodule MyAppTest do
  use ExUnit.Case

  test "example test" do
    ref = Broadway.test_messages(MyApp, [1, 2, 3])
    assert_receive {:ack, ^ref, [_, _, _] = _successful, _failed}, 1000
  end
end

but that always generates an error:

     test/my_app_test.exs:5
     ** (exit) exited in: GenServer.call(MyApp, :producer_names, 5000)
         ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
     code: ref = Broadway.test_messages(BroadwaySQS.Producer, [1, 2, 3])

I’ve tried using various other modules as input to the Broadway.test_messages function, but can’t seem to get past the error. I’m sure I’m missing something silly, but what is a valid argument to that function?

Most Liked

NobbZ

NobbZ

Have you started your brodway? And how do you do so?

fireproofsocks

fireproofsocks

Ah… digging around a bit it’s making a bit more sense.

This is my application.ex:

defmodule MyApp.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  def start(_type, _args) do
    children = []

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

In this case, the various Broadway processes are NOT started with the rest of the app. Instead, they get started in dedicated .exs scripts using something like

alias MyApp.Something

require Logger

# Parse arguments to get a duration value, but for simplicity:
duration = 30_000
{:ok, pid} = Something.start_link([])
Process.sleep(duration)

GenServer.stop(pid)
|> Logger.info()

If I add the Something.start_link([]) to my tests, then the test methods seem to work… at least it’s getting new error messages…

fireproofsocks

fireproofsocks

I’ve had to dig much deeper on this, and I’ve found a couple things… first is that the documentation appears to be wrong (but someone should check me on that before I submit a PR).

One thing to clarify is that the Broadway.test_messages/2 is pretty simplistic: the list of arguments that you provide it as the 2nd argument gets mapped onto %Broadway.Message{} structs as the data field. In our case, that was not sufficient to test our pipeline because our message handlers rely on pattern matching of the message metadata field as well.

A useful modification to my start_link/1 function allows me to use the Broadway.DummyProducer in my tests:

defmodule Something do
  use Broadway
  alias Broadway.Message
  
  def start_link(opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {
          Keyword.get(opts, :producer, BroadwaySQS.Producer),
          queue_url: Application.get_env(:my_app, :sqs_queue_url),
          message_attribute_names: ["type", "foo"],
          config: Application.fetch_env!(:my_app, :sqs_config)
        }
      ],
      # ... batchers, etc...
    )
  end

The area in particular that does not work as advertised is the assert_receive. Instead of returning a list of all successful messages in one go, the batch_mode: :flush option seems to trigger each message to eject as soon as it’s done processing. So instead of a single assert_receive checking for a list with 3 elements, I had to do 3 assert_receive statements, each checking for a list with 1 element. Here’s how it looked (with my custom modifications to mimic what was in test_messages/2:

# I set up a fixture that supplied fully formed `%Broadway.Message{}` structs
test "pipeline is drained of test messages", %{msg1: msg1, msg2: msg2, msg3: msg3} do
      {:ok, _pid} = Something.start_link(
        producer: Broadway.DummyProducer,
      # ...
      )
      # We cannot make use of `Broadway.test_messages/2` because the list of values you provide it as the 2nd argument
      # maps to each message's `data` field, and that's all it does in the way of creating test messages.
      # Our messages are more complex: our messages must have metadata, so we come up with our own modification of the
      # `Broadway.test_messages/2` function:
      batch_mode = :flush
      ref = make_ref()
      ack = {Broadway.CallerAcknowledger, {self(), ref}, :ok}

      msg1 = %Message{msg1 | acknowledger: ack, batch_mode: batch_mode}
      msg2 = %Message{msg2 | acknowledger: ack, batch_mode: batch_mode}
      msg3 = %Message{msg3 | acknowledger: ack, batch_mode: batch_mode}

      messages = [msg1, msg2, msg3]

      :ok = Broadway.push_messages(Something, messages)

        # Assert that the messages have been consumed
        assert_receive {:ack, ^ref, [_] = _successful, _failed}, 100
        assert_receive {:ack, ^ref, [_] = _successful, _failed}, 100
        assert_receive {:ack, ^ref, [_] = _successful, _failed}, 100
    end

Note that a 4th assert_receive statement would fail because the mailbox would be empty.

I tried doing this using batch_mode: :bulk but that also seems to have returned 1 message per receive.

Where Next?

Popular in Questions Top

lessless
I believe there are people here who are dealing with CSV files import on the daily basis, and since Excel is a really popular tool there ...
New
fireproofsocks
Forgive me if this is obvious, but how does one delete a database record WITHOUT selecting it first? https://hexdocs.pm/ecto/Ecto.Repo.h...
New
vonH
When I run the Plug and I recompile I wind up having to use Ctrl C to quit iex and start again. Witht the help of rlwrap I can use the cu...
New
itssasanka
Hi all, Trying to get some more clarity over utc_datetime and naive_datetime for Ecto: https://hexdocs.pm/ecto/Ecto.Schema.html#module-...
New
bsollish-terakeet
Credo is smart enough to check for (something like) this: assert length(the_list) == 0 with this response: Checking if an enum is empt...
New
script
If I have a string “1000 cfu/ml” . I want to remove the characters and / and space . So the string is like this "1000" What is the ...
New
sergio_101
I am VERY much an elixir newbie. I have taken one elixir course and one phoenix course on Udemy. During that course, I saw the instructor...
New
dotdotdotPaul
Okay, I'm having a heck of a time trying to figure out how to best handle the validation of belongs_to associations in Ecto. I'm sure I'...
New
WestKeys
Currently suffering from paralysis by [HTTP client] analysis. This is rather unusual in Elixirland as there tends to be consensus on the ...
New
jononomo
For some reason my phoenix channels are working for me in my local dev environment, but as soon as I deploy via Docker, I get a 403 error...
New

Other popular topics Top

sorentwo
Hello! tl;dr Announcing Oban, an Ecto based job processing library with a focus on reliability and historical observability. After spen...
985 42842 311
New
msaraiva
Surface is an experimental library built on top of Phoenix LiveView and its new LiveComponent API that aims to provide a more declarative...
564 43591 214
New
josevalim
Hi everyone, One of the features added to Elixir early on to help integration with Erlang code was the idea of overridable function defi...
New
shahryarjb
Hello, I have map which I want to convert it to string like this: the map: %{last_name: "tavakkoli", name: "shahryar"} the string I ne...
New
Lily
In templates/appointment/index.html.eex: <%= for appointment <- @appointments do %> <tr> <td><%= appoi...
New
KronicDeth
Elixir plugin for JetBrain’s IntelliJ Platform (including Rubymine) This is a plugin that adds support for Elixir to JetBrains IntelliJ...
289 35953 110
New
nsuchy
Hi. I’ve noticed that Windows Powershell has it’s own IEX command and you cannot access Elixir’s IEX due to the conflict. This isn’t a cr...
New
AstonJ
Please see the new poll here: Which code editor or IDE do you use? (Poll) (2022 Edition) It’s been a while since we first asked this, I...
208 31107 143
New
klo
Got a question about when to concat vs. prepending items to list then reversing to achieve appending. So i know lists boil down to [1 | ...
New
lanycrost
Hi everyone! I need implement if…else if…else condition from my elixir code, and anymore of this control flow structures not work proper...
New

We're in Beta

About us Mission Statement