I am using broadway lately (which by the way is a pleasure to work with) for some data pipelines and I am wondering if the approaches I have taken are the optimal ones.
Context
The basic workflow is:
- Read messages from a set of
kafka
topics - Process the raw messages and extract a set of valid json documents. Some incoming may be invalid so we need to ignore them.
- Store the processed data.
How to ignore/skip raw messages.
Incoming data are processed in a process_data
function which returns the status of the processing and the transformed data. The returned statuses can be :ok
, :skip
or :error
. Currently I am handling messages as following:
def handle_message(_, %Message{data: data} = message, _) do
case process_data(data) do
{:ok, data} ->
Message.update_data(message, fn _ -> data end)
|> Message.put_batch_key(data.type)
|> Message.put_batcher(:default)
{:skip, reason} ->
Message.failed(message, reason)
{:error, reason} ->
Message.failed(message, reason)
end
end
Question 1
Should I mark the messages as failed directly or should I add them to the batcher and ignore them in the handle_batch
function?
Multiple batchers
Assume that I want to do more than one operation on the successfully processed messages, e.g. store them in a database and store them to s3
. broadway
does not support propagating the same message to different batchers for good reasons. I see two options here:
- Use a single batcher and do all the batch processing (e.g. store mesages both to the database and s3). The drawback of this approach is that I cannot have different
batch_size
andbatch_timeout
options. - Create a pipeline for each batcher. The drawback of this approach is that the same raw messages will be processed multiple times.
Question 2
Is there any alternative method? What do you think is the best approach for that use case?
Multiple extracted messages for each incoming message
Each incoming kafka
message may contain multiple sub-messages. For example a raw message can be:
[A1, A2, B1, C1, B2, A3]
where A,B, C
are the different message types. I would like to do something like:
extracted_messages
|> Enum.map(fn x -> Message.put_batch_key(x, x.type) end)
|> Enum.map(&Message.put_batcher(&1, :default))
The problem here is that handle_message
must return exactly one message, so I cannot split a single incoming messages to multiple processed messages, each of which should have a different batch key or potentially a different batcher.
Question 3
The approach I am currently following is to send each of these sub-messages to another kakfa
topic and handle them with a secodary pipeline. Is there any better solution? Could this be achieved in a single pipeline?