Hi.
After four years of abstinence I’m finally back working with elixir.
But it looks like my elixir skills haven’t aged well, so I need to ask for help.
My problem is testing a broadway pipeline which consumes data from Google Cloud Pub/Sub.
Testing just the plain pipeline is quite easy but my handle_message
function needs to put data from Google Cloud Storage which I want to mock.
The flow is:
- Text file gets created in Google Cloud Storage bucket
- Google Cloud Pub/Sub gets message about the newly created file
- Broadway listens on the pub/sub subscription
- If a message arrives at broadway the
handle_message
function downloads the file content and prepares it for saving the data into our data warehouse.
Everything works without problems I only can’t get a test ready which mocks out the call to Google Cloud Storage.
I’m using Mox and here is some code I currently have:
# config/test.exs
config :my_app, :producer_module, {Broadway.DummyProducer, []}
config :my_app, :storage_client, MyApp.MockStorageClient
# test/test_helper.exs
Mox.defmock(MyApp.MockStorageClient, for: MyApp.StorageClient)
This is the storage client which is basically the way the Mox documentation does it:
# my_app/storage_client
defmodule MyApp.StorageClient do
@callback get_file_content(bucket :: String.t(), filename :: String.t()) ::
{:ok, String.t()} | {:error, any()}
def get_file_content(bucket, filename), do: impl().get_file_content(bucket, filename)
defp impl, do: Application.get_env(:my_app, :storage_client, MyApp.Google.StorageClient)
end
defmodule MyApp.Pubsub do
use Broadway
alias Broadway.Message
def start_link(_opts) do
producer_module = Application.fetch_env!(:my_app, :producer_module)
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: producer_module
],
processors: [
default: []
],
batchers: [
default: [
batch_size: 10,
batch_timeout: 2_000
]
]
)
end
@impl true
def handle_message(
_processor,
%Message{} = message,
_context
) do
with %{attributes: %{} = meta} <- message.metadata,
{:ok, data} <- Jason.decode(message.data) do
case MyApp.StorageClient.get_file_content(meta["bucketId"], meta["objectId"]) do
{:ok, file_content} ->
message
|> Message.put_data(Map.put(data, "file_content", file_content))
{:error, reason} ->
Message.failed(message, reason)
end
else
_ ->
Message.failed(message, "Failed to process PubSub message")
end
end
end
And the test looks like this. By the way, I had a hard time getting a proper message ready which looks like the ones I get in the handle_message
function coming directly from pub/sub.
defmodule MyApp.PubsubTest do
use ExUnit.Case, async: true
import Mox
setup :verify_on_exit!
test "message" do
parent = self()
ref = make_ref()
ack = {Broadway.CallerAcknowledger, {parent, ref}, :ok}
MyApp.MockStorageClient
|> expect(:get_file_content, fn _bucket, _filename ->
send(parent, {ref, :temp})
{:ok, ""}
end)
metadata = %{
attributes: %{
bucketId: "elixir-broadway-test",
eventTime: "2022-05-10T15:19:28.464019Z",
eventType: "OBJECT_FINALIZE",
notificationConfig: "projects/_/buckets/elixir-broadway-test/notificationConfigs/1",
objectGeneration: "1652195968450664",
objectId: "demo.json",
payloadFormat: "JSON_API_V1"
}
}
data =
"{\n \"kind\": \"storage#object\",\n \"id\": \"elixir-broadway-test/demo.json/1652195968450664\",\n \"selfLink\": \"https://www.googleapis.com/storage/v1/b/elixir-broadway-test/o/demo.json\",\n \"name\": \"demo.json\",\n \"bucket\": \"elixir-broadway-test\",\n \"generation\": \"1652195968450664\",\n \"metageneration\": \"1\",\n \"contentType\": \"application/json\",\n \"timeCreated\": \"2022-05-10T15:19:28.464Z\",\n \"updated\": \"2022-05-10T15:19:28.464Z\",\n \"storageClass\": \"STANDARD\",\n \"timeStorageClassUpdated\": \"2022-05-10T15:19:28.464Z\",\n \"size\": \"2786889\",\n \"md5Hash\": \"dsdjaaskdasd+iqv6ydddvQ==\",\n \"mediaLink\": \"https://www.googleapis.com/download/storage/v1/...\",\n \"rer444\": \"999Fsa==\",\n \"etag\": \"COiAss6d1fcCEAE=\"\n}\n"
message = %Broadway.Message{
acknowledger: ack,
batch_key: :default,
batch_mode: :flush,
batcher: :default,
data: data,
metadata: metadata
}
Broadway.push_messages(MyApp.Pubsub, [message])
assert_receive {:ack, ^ref,
[
%Broadway.Message{
acknowledger: {Broadway.CallerAcknowledger, {_, _}, :ok},
batch_key: :default,
batch_mode: :flush,
batcher: :default,
data: ^data,
metadata: ^metadata,
status: :ok
}
], []}
end
end
The failure I get is:
15:24:24.467 [error] ** (Mox.UnexpectedCallError) no expectation defined for MyApp.MockStorageClient.get_file_content/2 in process #PID<0.335.0> with args [nil, nil]
(mox 1.0.1) lib/mox.ex:741: Mox.__dispatch__/4
(hermes 0.1.0) lib/my_app/pubsub.ex:34: MyApp.Pubsub.handle_message/3
(broadway 1.0.3) lib/broadway/topology/processor_stage.ex:156: anonymous fn/6 in Broadway.Topology.ProcessorStage.handle_messages/4
(telemetry 1.1.0) /home/me/projects/my_app/deps/telemetry/src/telemetry.erl:320: :telemetry.span/3
(broadway 1.0.3) lib/broadway/topology/processor_stage.ex:143: Broadway.Topology.ProcessorStage.handle_messages/4
(broadway 1.0.3) lib/broadway/topology/processor_stage.ex:63: anonymous fn/2 in Broadway.Topology.ProcessorStage.handle_events/3
(telemetry 1.1.0) /home/me/projects/my_app/deps/telemetry/src/telemetry.erl:320: :telemetry.span/3
(gen_stage 1.1.2) lib/gen_stage.ex:2471: GenStage.consumer_dispatch/6
(gen_stage 1.1.2) lib/gen_stage.ex:2660: GenStage.take_pc_events/3
(stdlib 3.17.1) gen_server.erl:695: :gen_server.try_dispatch/4
(stdlib 3.17.1) gen_server.erl:771: :gen_server.handle_msg/6
(stdlib 3.17.1) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Here I’m stuck and can’t figure out how to solve this.
I read about explicit allow in the Mox documentation but have no idea how to bring this into any broadway process.
Here the different versions I use:
Erlang/OTP 24 [erts-12.3] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit]
Elixir 1.13.3 (compiled with Erlang/OTP 22)
broadway, "1.0.3"
mox, "1.0.1"
I hope someone can help.
thx
Cheers
Frank