Hello. I am implementing a parser for S3/SQS messages, right now it’s a bunch of code that gets run in a preconfigured Broadway-using module, but I find it not very clean, so I started to develop a transformer, which takes a %Message{data :: binary}
into a %Message{data :: %MyStruct{raw: original, some: :interesting_data}}
. With that I intend to have the actual broadway “productions” much more close to broadway’s documentation, just adding the BroadwaySQS producer and my transformer.
The issue is that in testing, I call Broadway.test_messages(name, ["raw message"])
and it does not seem to call the transformer at all.
If I read the code correctly, that’s exactly how it’s implemented:
@impl true
def handle_call({:push_messages, messages}, _from, state) do
{:reply, :ok, messages, state}
end
it calls the transformer only on handle_demand and handle_events.
Am I wrong in assuming that the transformer should be called in Broadway.test_messages at least?
What alternative do I have to ensure that the transformer behaves correctly?