Broadway producer to adapt a bad old push source

Broadway’s straightforward backpressure mechanism of producing new messages only as there is enough demand to consume them works very well for external event queues like RabbitMQ, Kafka, or databases, but I’ve run into an interesting limitation, trying to write a producer module for a push-only data source.

To make my particular case concrete, this is a server-sent event endpoint (ironically wrapping a private Kafka stream, that I can’t access). This source is a fire hose of events which goes on until the client disconnects. The interface includes a crude cursor which allows for reconnection starting at a given event ID and so supports at-least-once consumption even in the case of disconnect or other failure, although without any particularly nice guarantees.

In theory it should be possible to write a Broadway producer for this stream, which adapts from the push interface to a demand-driven one. If backpressure is too high, messages can be queued, and as a last resort the module will intentionally disconnect. The connection can be retried after demand recovers, or to restart after other failures. My rough code without reconnect logic isn’t much to write home about so far. The previous incarnation is a GenStage pipeline, but lacks backpressure.

A simpler approach would be to read continuously from the event source into a buffer and then connect a broadway producer to the buffer, but it would be a pity to lose the backpressure. Disconnecting the event source is nice because it causes a push source to convincingly emulate some of the important characteristics of a modern source, like allowing the integrator to choose the upper bound on storage and processor use.

What I’ve found is that Broadway and GenStage seem very much not built for this use case of throttling a push source, or more likely I must be making some basic mistake in attempting to wire a bad old push source to fancy new machinery? So far, what I’m imagining is that disconnection should be triggered by a combination of demand underrun and the internal buffer reaching a desired maximum. Maybe this logic could plug in like a rate limiter, or GenStage’s internal buffer would include a warning threshold before hitting buffer_size, and send a signal rather than just logging errors, or the built-in ProducerStage would signal the custom producer module on demand underrun…

1 Like

I am using an ets :queue to store requests, and a GenServer Producer reads that queue periodically.

This Producer is the source for the Broadway pipeline, and it works great :slight_smile:

Does the queue have enough information on its size that the SSE client could know to quit if it gets too big?

1 Like

No, it’s just a simple queue…

The producer looks like this.

defmodule AI.ImagePipeline.Producer do
  @moduledoc false

  use GenStage
  require Logger

  @queue_polling 5_000
  @key :image_queue

  alias AI.Queue

  def start_link(_) do
    GenStage.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_) do
    Logger.info("#{inspect(self())}: #{__MODULE__} started.")
    {:producer, %{queue: Queue, pending: 0}}
  end

  def handle_info(:try_again, %{queue: queue, pending: demand} = state) do
    send_events_from_queue(queue, demand, state)
  end

  def handle_demand(demand, %{queue: queue, pending: pending} = state) when demand > 0 do
    total_demand = demand + pending
    send_events_from_queue(queue, total_demand, state)
  end

  defp send_events_from_queue(queue, how_many, state) do
    tasks = queue.dequeue(@key, how_many)

    if length(tasks) < how_many do
      Process.send_after(self(), :try_again, @queue_polling)
    end

    {:noreply, tasks, %{state | pending: how_many - length(tasks)}}
  end
end

And the transformer like this…

defmodule AI.ImagePipeline.Transformer do
  use Broadway
  require Logger

  alias Broadway.Message
  alias AI.Core
  alias AI.FFmpegHelper

  @producer AI.ImagePipeline.Producer

  def start_link(_opts) do
    options = [
      name: __MODULE__,
      producer: [
        module: {@producer, 1},
        transformer: {__MODULE__, :transform, []}
      ],
      processors: [
        default: [concurrency: 10]
      ]
    ]

    Broadway.start_link(__MODULE__, options)
  end

  def transform(event, _opts) do
    %Message{
      data: event,
      acknowledger: {__MODULE__, :ack_id, :ack_data}
    }
  end

  # This hook can be used to prepare data outside of the handle_message callback.
  def prepare_messages(messages, _context) do
    Enum.map(messages, fn message ->
      case extract_metadata(message.data.path) do
        {:ok, metadata} ->
          Broadway.Message.update_data(message, fn data ->
            Map.put(data, :metadata, metadata)
          end)
        {:error, reason} ->
          Logger.error("Could not extract metadata: #{inspect reason}")
          message
      end
    end)
  end

  def handle_message(_processor, %{data: %{id: id, path: path} = data} = message, _context) do
    # Add your business logic here...

    start = data[:metadata]["start_time"] || 0.0
    screenshots_output = Core.get_screenshots_path(id)

    FFmpegHelper.extract_screenshots(path, screenshots_output, start: start)
    Core.recon_face(screenshots_output)

    IO.inspect(message, label: "Message")
  end

  def ack(:ack_id, successful, failed) do
    # Write ack code here
    Logger.info "ACK #{inspect successful} #{inspect failed}"
  end

  # Wrapper around FFprobe.format/1
  defp extract_metadata(path), do: FFprobe.format(path)
end
1 Like

What I’m looking for is something along the lines of receiving a call handle_demand(0, state)—some way that the producer can detect demand underflow. This could be sent immediately after dispatching a batch of events, if no more demand has arrived by that time. The producer would have the option of ignoring the message, or in the case discussed here, setting a state bit to pause once the event buffer reaches some threshold.

As an aside, it’s interesting that gen_stage gives an example of guarding handle_demand with when demand > 0 and many producers such as kokolegorille’s helpful example above repeat this clause. Evidently, demand = 0 would crash modules which include this guard! Other than this source of incompatibility with existing code, can anyone here see a reason that we wouldn’t want gen_stage to send my proposed demand underflow messages?

I‘m not sure how a handle_demand(0, state) would be any different to the producer internally tracking demand and noticing when demand was fully supplied. Demand can come in at any time in a concurrent system and nobody can predict when there would be a time to expect new demand less.

1 Like