Broadway Custom Producer throughput tuning

I’m creating a Custom Broadway Producer that polls messages from a plain old SOAP WebService which is only able to deliver one message per request. This custom producer replies and immediately requests more messages (should I have more), or sleeps up to 1 minute before requesting for more.

The issue that I’m seeing is that the throughput, even with not that many messages (the service usually gets busy only during working hours, receiving around 18k messages over 8 hours), the consumers take a lot of time to receive those messages. I’m wondering if there’s any tuning or configuration I might have missed the point.

I’m now running multiple instances of the same app horizontally and getting an acceptable performance. However, it happened for messages to take hours to flow from the producer to consumers when I was running a single instance of the service.

The code of my producer is as follows:

defmodule MyProducer do
  use GenStage

  alias Broadway.Producer

  @behaviour Producer

  @default_receive_interval 60_000

  @impl true
  def init(opts) do
    receive_interval = opts[:receive_interval] || @default_receive_interval

    {:producer, %{receive_timer: nil, receive_interval: receive_interval}}
  end

  @impl true
  def handle_demand(_incoming_demand, state) do
    handle_receive_message(state)
  end

  @impl true
  def handle_info(:receive_message, %{receive_timer: nil} = state) do
    {:noreply, [], state}
  end

  @impl true
  def handle_info(:receive_message, state) do
    handle_receive_message(%{state | receive_timer: nil})
  end

  @impl true
  def handle_info(_, state) do
    {:noreply, [], state}
  end

  defp handle_receive_message(state) do
    {has_next, contents} = MyClient.receive_message(state)

    receive_timer =
      case has_next do
        false -> schedule_receive_message(state.receive_interval)
        true -> schedule_receive_message(0)
      end

    {:noreply, contents, %{state | receive_timer: receive_timer}}
  end

  defp schedule_receive_message(interval) do
    Process.send_after(self(), :receive_message, interval)
  end
end

And the current configuration:

[
    broadway: [
      index: 0,
      hibernate_after: 15000,
      context: :context_not_set,
      resubscribe_interval: 100,
      max_seconds: 5,
      max_restarts: 3,
      shutdown: 30000,
      name: MyConsumer,
      producer: [
        hibernate_after: 15000,
        concurrency: 1,
        module: {MyProducer,
         [receive_interval: 60000]}
      ],
      processors: [
        default: [hibernate_after: 15000, max_demand: 10, concurrency: 16]
      ],
      batchers: [
        default: [
          hibernate_after: 15000,
          batch_timeout: 1000,
          concurrency: 1,
          batch_size: 1
        ]
      ]
    ]
  ]

Solved by tuning the min_demand and max_demand of my processors. This discussion helped a lot.