Mocking with Mox inside Broadway pipline

Hi.

After four years of abstinence I’m finally back working with elixir.

But it looks like my elixir skills haven’t aged well, so I need to ask for help.

My problem is testing a broadway pipeline which consumes data from Google Cloud Pub/Sub.

Testing just the plain pipeline is quite easy but my handle_message function needs to put data from Google Cloud Storage which I want to mock.

The flow is:

  1. Text file gets created in Google Cloud Storage bucket
  2. Google Cloud Pub/Sub gets message about the newly created file
  3. Broadway listens on the pub/sub subscription
  4. If a message arrives at broadway the handle_message function downloads the file content and prepares it for saving the data into our data warehouse.

Everything works without problems I only can’t get a test ready which mocks out the call to Google Cloud Storage.

I’m using Mox and here is some code I currently have:

# config/test.exs
config :my_app, :producer_module, {Broadway.DummyProducer, []}
config :my_app, :storage_client, MyApp.MockStorageClient
# test/test_helper.exs
Mox.defmock(MyApp.MockStorageClient, for: MyApp.StorageClient)

This is the storage client which is basically the way the Mox documentation does it:

# my_app/storage_client
defmodule MyApp.StorageClient do
  @callback get_file_content(bucket :: String.t(), filename :: String.t()) ::
              {:ok, String.t()} | {:error, any()}

  def get_file_content(bucket, filename), do: impl().get_file_content(bucket, filename)
  defp impl, do: Application.get_env(:my_app, :storage_client, MyApp.Google.StorageClient)
end
defmodule MyApp.Pubsub do
  use Broadway

  alias Broadway.Message

  def start_link(_opts) do
    producer_module = Application.fetch_env!(:my_app, :producer_module)

    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: producer_module
      ],
      processors: [
        default: []
      ],
      batchers: [
        default: [
          batch_size: 10,
          batch_timeout: 2_000
        ]
      ]
    )
  end

  @impl true
  def handle_message(
        _processor,
        %Message{} = message,
        _context
      ) do
    with %{attributes: %{} = meta} <- message.metadata,
         {:ok, data} <- Jason.decode(message.data) do
      case MyApp.StorageClient.get_file_content(meta["bucketId"], meta["objectId"]) do
        {:ok, file_content} ->
          message
          |> Message.put_data(Map.put(data, "file_content", file_content))

        {:error, reason} ->
          Message.failed(message, reason)
      end
    else
      _ ->
        Message.failed(message, "Failed to process PubSub message")
    end
  end
end

And the test looks like this. By the way, I had a hard time getting a proper message ready which looks like the ones I get in the handle_message function coming directly from pub/sub.

defmodule MyApp.PubsubTest do
  use ExUnit.Case, async: true

  import Mox

  setup :verify_on_exit!

  test "message" do
    parent = self()
    ref = make_ref()
    ack = {Broadway.CallerAcknowledger, {parent, ref}, :ok}

    MyApp.MockStorageClient
    |> expect(:get_file_content, fn _bucket, _filename ->
      send(parent, {ref, :temp})
      {:ok, ""}
    end)

    metadata = %{
      attributes: %{
        bucketId: "elixir-broadway-test",
        eventTime: "2022-05-10T15:19:28.464019Z",
        eventType: "OBJECT_FINALIZE",
        notificationConfig: "projects/_/buckets/elixir-broadway-test/notificationConfigs/1",
        objectGeneration: "1652195968450664",
        objectId: "demo.json",
        payloadFormat: "JSON_API_V1"
      }
    }

    data =
      "{\n  \"kind\": \"storage#object\",\n  \"id\": \"elixir-broadway-test/demo.json/1652195968450664\",\n  \"selfLink\": \"https://www.googleapis.com/storage/v1/b/elixir-broadway-test/o/demo.json\",\n  \"name\": \"demo.json\",\n  \"bucket\": \"elixir-broadway-test\",\n  \"generation\": \"1652195968450664\",\n  \"metageneration\": \"1\",\n  \"contentType\": \"application/json\",\n  \"timeCreated\": \"2022-05-10T15:19:28.464Z\",\n  \"updated\": \"2022-05-10T15:19:28.464Z\",\n  \"storageClass\": \"STANDARD\",\n  \"timeStorageClassUpdated\": \"2022-05-10T15:19:28.464Z\",\n  \"size\": \"2786889\",\n  \"md5Hash\": \"dsdjaaskdasd+iqv6ydddvQ==\",\n  \"mediaLink\": \"https://www.googleapis.com/download/storage/v1/...\",\n  \"rer444\": \"999Fsa==\",\n  \"etag\": \"COiAss6d1fcCEAE=\"\n}\n"

    message = %Broadway.Message{
      acknowledger: ack,
      batch_key: :default,
      batch_mode: :flush,
      batcher: :default,
      data: data,
      metadata: metadata
    }

    Broadway.push_messages(MyApp.Pubsub, [message])

    assert_receive {:ack, ^ref,
                    [
                      %Broadway.Message{
                        acknowledger: {Broadway.CallerAcknowledger, {_, _}, :ok},
                        batch_key: :default,
                        batch_mode: :flush,
                        batcher: :default,
                        data: ^data,
                        metadata: ^metadata,
                        status: :ok
                      }
                    ], []}
  end
end

The failure I get is:

15:24:24.467 [error] ** (Mox.UnexpectedCallError) no expectation defined for MyApp.MockStorageClient.get_file_content/2 in process #PID<0.335.0> with args [nil, nil]
    (mox 1.0.1) lib/mox.ex:741: Mox.__dispatch__/4
    (hermes 0.1.0) lib/my_app/pubsub.ex:34: MyApp.Pubsub.handle_message/3
    (broadway 1.0.3) lib/broadway/topology/processor_stage.ex:156: anonymous fn/6 in Broadway.Topology.ProcessorStage.handle_messages/4
    (telemetry 1.1.0) /home/me/projects/my_app/deps/telemetry/src/telemetry.erl:320: :telemetry.span/3
    (broadway 1.0.3) lib/broadway/topology/processor_stage.ex:143: Broadway.Topology.ProcessorStage.handle_messages/4
    (broadway 1.0.3) lib/broadway/topology/processor_stage.ex:63: anonymous fn/2 in Broadway.Topology.ProcessorStage.handle_events/3
    (telemetry 1.1.0) /home/me/projects/my_app/deps/telemetry/src/telemetry.erl:320: :telemetry.span/3
    (gen_stage 1.1.2) lib/gen_stage.ex:2471: GenStage.consumer_dispatch/6
    (gen_stage 1.1.2) lib/gen_stage.ex:2660: GenStage.take_pc_events/3
    (stdlib 3.17.1) gen_server.erl:695: :gen_server.try_dispatch/4
    (stdlib 3.17.1) gen_server.erl:771: :gen_server.handle_msg/6
    (stdlib 3.17.1) proc_lib.erl:226: :proc_lib.init_p_do_apply/3

Here I’m stuck and can’t figure out how to solve this.

I read about explicit allow in the Mox documentation but have no idea how to bring this into any broadway process.

Here the different versions I use:

Erlang/OTP 24 [erts-12.3] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit]
Elixir 1.13.3 (compiled with Erlang/OTP 22)
broadway, "1.0.3"
mox, "1.0.1"

I hope someone can help.

thx :slight_smile:
Cheers
Frank

1 Like

@suchasurge I usually like to have 3 files for this type of issue: one for defining the behaviour, of for the implementation and one that does the switch. The switch seems to be happening in the same spot you are defining the callback in your example.

Using your example and the multiple files approach I describe above, the structure would look like the following:

# define the behaviour
defmodule MyApp.StorageClientBehaviour do
  @callback get_file_content(bucket :: String.t(), filename :: String.t()) ::
              {:ok, String.t()} | {:error, any()}
end

# define the client
defmodule MyApp.GoogleStorageClient do
  @behaviour MyApp.StorageClientBehaviour
  
  def get_file_content(bucket, filename) do
    # your implementation goes here
  end
end

# do the switch when you want to call the implementation, in your case, seems like the module that uses Broadway.
defmodule MyApp.Pubsub do
  use Broadway

  # in your handle_message function, you can call this:
 ... <- file_storage_impl().get_file_content(bucket, filename)
  
  defp file_storage_impl, do: Application.get_env(:my_app, :storage_client, MyApp.Google.StorageClient)
end

Here is a step by step: GitHub - dashbitco/mox: Mocks and explicit contracts in Elixir (example in the readme). You seem to have the config stuff correctly, please try that and report back?

Also, welcome back! Glad you are using Elixir! :rocket:

@suchasurge Broadway comes with a test producer built in, I would use that instead of mocking. Broadway — Broadway v1.0.3

1 Like

Welcome @suchasurge! This is one of my favorite topics :slight_smile:

First, let me attempt to convince you to avoid Mox for testing anything related to Broadway: Each Broadway pipeline is an entire tree of processes responsible for requesting, processing, batching, and acknowledging Messages. In my experience targeting the proper process to allow Mox becomes an exercise in futility.

What if we approach the problem in a different way and make the pipeline more configurable?

For instance, you could pass the configuration to the pipeline when it starts. In your supervision tree, you write something like this:

children = [
  # others
  {MyApp.Pubsub,
     producer_module: Application.fetch_env!(:my_app, :producer_module),
     get_file_content: &MyApp.Google.StorageClient.get_file_content/2}
]

Broadway has a configuration option for :context that can be passed to Broadway.start_link/2 and will be given as an argument to all the callback functions in your Broadway module.

We can use :context to inject our get_file_content/2 function into the handle_message/3 callback:

# MyApp.Pubsub

def start_link(opts) do
    name = Keyword.get(opts, :name, __MODULE__)
    producer_module = Keyword.fetch!(opts, :producer_module)
    get_file_content = Keyword.fetch!(opts, :get_file_content)

    Broadway.start_link(__MODULE__,
      name: name,
      context: %{
        get_file_content: get_file_content
      },
      producer: [
        module: producer_module
      ],
      # rest...
    )
end

@impl true
  def handle_message(
        _processor,
        %Message{} = message,
        %{get_file_content: get_file_content}
      ) do
    with %{attributes: %{} = meta} <- message.metadata,
         {:ok, data} <- Jason.decode(message.data) do
      case get_file_content.(meta["bucketId"], meta["objectId"]) do
        # ^^ invoke the function from the context map
      end
    end
  end

Now we have dependency injection for the pipeline, but how do we use it? Let’s update the test

defmodule MyApp.PubsubTest do
  use ExUnit.Case, async: true

  test "message" do
    parent = self()
    ref = make_ref()
    ack = {Broadway.CallerAcknowledger, {parent, ref}, :ok}

    broadway_name = __MODULE__.MessageTest

    start_supervised!(
      {MyApp.Pubsub,
       name: broadway_name,
       producer_module: {Broadway.DummyProducer, []},
       get_file_content: fn _bucket, _object ->
         send(parent, {ref, :temp})
         {:ok, ""}
       end}
    )

    # setup...

    Broadway.push_messages(broadway_name, [message])
    
    # assertions...
  end
end

You can start an isolated pipeline specifically for this test, with its own version of get_file_content/2. Since it’s in scope, you can do assert there, send messages to the test pid, return a custom value, etc.

That’s a lot of things all at once, but I hope it helps! I’m happy to answer additional questions, too! :slight_smile:

2 Likes

@suchasurge you should definitely go the way @benwilson512 and @mcrumm suggested.

The approach with Mox is valid, but not in the context you are trying to use, there are better tools. I looked at the code and didn’t read your question/saw what you were trying to do. I saw Mox and went full Mox :slight_smile: There are better tools to test that like folks suggested.

Mox is still very valid though. Highly suggest it, but as folks pointed out, not for what you are trying to solve here. Sorry for the misdirection.

2 Likes

Wow. Thx for the warm welcome and great suggestions.

You all are great!

I did it like @mcrumm suggested and keep Mox for another day.

Cheers
Frank

2 Likes