Anybody using Broadway with a database (or a file stream) as the producer?

As I deepen my understanding of gen_stage and need to field more diverse use-cases, I’m gaining more appreciation for Broadway. I think it has fielded many of the same problems I’m encountering, but one thing has always made it seem like an awkward fit for me: its reliance on external queues. Is anyone using Broadway with a database or a file stream as the producer?

To clarify a bit about our setup: we have various processes getting data from different sources and persisting them to storage (e.g. to a database). Then we have other processes parsing that data and storing it in new locations. Reading a file from a stream (a huge file, let’s say) seems like a perfect use-case for Broadway: each consumer can ask for new chunks once they’re ready. Are people doing that sort of thing? A similar use-case would be streaming data from a database: we just need to get it processed without flooding our system resources. Anyone using a database as a Broadway provider?

The only thing that we’re doing that requires a little bit of gen_stage mojo is that we need to rate limit our requests to remote APIs, and sometimes that bottleneck needs to apply (by design) to any pipeline that utilizes that particular API.

I’ve only ever used it with custom producers. The docs did give me the impression that external queues was its raison d’etre, but it works perfectly fine with whatever you throw at it.

One way I use it is to spread the work of a very large job across the available connections to an application via its C API. The producer is pretty much a queue that builds itself. It is good for limiting the number of processes that can access a resource.

To bind a database (producer) with a Broadway (consumer) you can use a CDC pattern.

For example:
PostgreSQL replication → Debezium CDC → Kafka stream → Broadway.

BTW, together with my colleague, I’ll be giving a talk this year exactly on that: ElixirConf EU

This solution requires some significant infrastructure set up (Kafka compatible message broker, etc…), but has multiple benefits - reliability, scalability, loose coupling…

1 Like

Using GitHub - cpursley/walex: Listen to Postgres change events might also be a possibility to produce work… But I’m yet to use it…

On a tangent I would also consider just using Oban for the background processing… but it’s all dependent on the task at hand, scale etc.

Thanks for the links!

Yeah, we’ve looked into Oban, but decided against it for a couple reasons, the primary one being that it adds another layer of complexity on top of our existing queues. Things got way simpler when we ditched SQS in favor of native messaging, for example. Even though a local PostGres instance is much simpler to wrangle during development than an external AWS service, it’s still requires additional overhead and represents more possible points of failure. It also has a whiff of redundancy about it: when the job is to “drain” the database by processing multiple db records, it seems strange to then turn-around and track that work with yet more database records. I think it’s a fabulous tool and I’m eager to put it to use, I just didn’t think it was the best fit for this particular use case.

1 Like

I’ve got a package to do this. Not published on hex though but if there’s interest I’ll put it up.

The provider just does polling of a table, adding support for Postgres notifications has been something I want to add but don’t have a pressing need for.

1 Like

Thanks, that looks interesting!

I tried putting together a custom Broadway Producer that operated on a simple stream. I used a file for testing, but in theory this could work on other streams too. I discovered the stream_split package which made this possible.

Here’s the code for my custom (file) stream producer:

defmodule StreamProducer do
  # See https://hexdocs.pm/broadway/custom-producers.html#example

  alias Broadway.Message

  # Broadway will not call the child_spec/1 or start_link/1 function of the producer.
  # That's because Broadway wraps the producer to augment it with extra features.
  def start_link(filepath) do
    GenStage.start_link(__MODULE__, filepath)
  end

  # When Broadway starts, the GenStage.init/1 callback will be invoked w the given opts.
  def init(filepath) do
    {:producer, File.stream!(filepath)}
  end

  def handle_demand(demand, stream) when demand > 0 do
    {head, tail} = StreamSplit.take_and_drop(stream, demand)
    {:noreply, head, tail}
  end

  # Not part of the behavior, but Broadway req's that we translate the genstage events
  # into Broadway msgs
  def transform(event, _opts) do
    %Message{
      data: event,
      acknowledger: {__MODULE__, :ack_id, :ack_data}
    }
  end

  def ack(:ack_id, successful, failed) do
    IO.puts("ACKING successful: #{length(successful)} failed: #{length(failed)}")
    # Write ack code here
  end
end

Then I set up my Broadway implementation:

defmodule MyApp do
  use Broadway

  alias Broadway.Message

  def start_link(file) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {StreamProducer, file},
        transformer: {StreamProducer, :transform, []}
      ],
      processors: [
        default: [concurrency: 100]
      ],
      batchers: [
        default: [concurrency: 1, batch_size: 100, batch_timeout: 2000]
      ]
    )
  end

  # Do the work
  @impl true
  def handle_message(_, %Message{data: data} = message, _) do
    IO.inspect(data, label: "HANDLING MESSAGE")

    # Simulate load
    Process.sleep(1000)

    message
  end

  @impl true
  def handle_batch(_, messages, _, _) do
    IO.puts("HANDLING BATCH OF >>>> #{length(messages)} <<<<")

    messages
  end
end

And I ran it doing something like:

iex> MyApp.start_link("/path/to/huge/file.txt")

The whole thing worked exactly as advertised… concurrency configured to the limits of my system etc. and my huge file was processed.

Does anyone know if this will work to process an Ecto stream? Ecto.Repo — Ecto v3.9.4
I’m gonna try that… although the bit about it only working within a transaction may be a deal breaker… I’m not sure if it would work with multiple transactions… you’d have to do a new transaction inside handle_demand.

1 Like