Broadway pipelines questions and best practices

I am using broadway lately (which by the way is a pleasure to work with) for some data pipelines and I am wondering if the approaches I have taken are the optimal ones.

Context

The basic workflow is:

  • Read messages from a set of kafka topics
  • Process the raw messages and extract a set of valid json documents. Some incoming may be invalid so we need to ignore them.
  • Store the processed data.

How to ignore/skip raw messages.

Incoming data are processed in a process_data function which returns the status of the processing and the transformed data. The returned statuses can be :ok, :skip or :error. Currently I am handling messages as following:

def handle_message(_, %Message{data: data} = message, _) do
    case process_data(data) do
      {:ok, data} -> 
            Message.update_data(message, fn _ -> data end)
            |> Message.put_batch_key(data.type)
            |> Message.put_batcher(:default)

      {:skip, reason} ->
          Message.failed(message, reason)

      {:error, reason} ->
          Message.failed(message, reason) 
    end
end

Question 1

Should I mark the messages as failed directly or should I add them to the batcher and ignore them in the handle_batch function?

Multiple batchers

Assume that I want to do more than one operation on the successfully processed messages, e.g. store them in a database and store them to s3. broadway does not support propagating the same message to different batchers for good reasons. I see two options here:

  • Use a single batcher and do all the batch processing (e.g. store mesages both to the database and s3). The drawback of this approach is that I cannot have different batch_size and batch_timeout options.
  • Create a pipeline for each batcher. The drawback of this approach is that the same raw messages will be processed multiple times.

Question 2

Is there any alternative method? What do you think is the best approach for that use case?

Multiple extracted messages for each incoming message

Each incoming kafka message may contain multiple sub-messages. For example a raw message can be:

[A1, A2, B1, C1, B2, A3]

where A,B, C are the different message types. I would like to do something like:

extracted_messages
|> Enum.map(fn x -> Message.put_batch_key(x, x.type) end)
|> Enum.map(&Message.put_batcher(&1, :default))

The problem here is that handle_message must return exactly one message, so I cannot split a single incoming messages to multiple processed messages, each of which should have a different batch key or potentially a different batcher.

Question 3

The approach I am currently following is to send each of these sub-messages to another kakfa topic and handle them with a secodary pipeline. Is there any better solution? Could this be achieved in a single pipeline?