I’m creating a Custom Broadway Producer that polls messages from a plain old SOAP WebService which is only able to deliver one message per request. This custom producer replies and immediately requests more messages (should I have more), or sleeps up to 1 minute before requesting for more.
The issue that I’m seeing is that the throughput, even with not that many messages (the service usually gets busy only during working hours, receiving around 18k messages over 8 hours), the consumers take a lot of time to receive those messages. I’m wondering if there’s any tuning or configuration I might have missed the point.
I’m now running multiple instances of the same app horizontally and getting an acceptable performance. However, it happened for messages to take hours to flow from the producer to consumers when I was running a single instance of the service.
The code of my producer is as follows:
defmodule MyProducer do
use GenStage
alias Broadway.Producer
@behaviour Producer
@default_receive_interval 60_000
@impl true
def init(opts) do
receive_interval = opts[:receive_interval] || @default_receive_interval
{:producer, %{receive_timer: nil, receive_interval: receive_interval}}
end
@impl true
def handle_demand(_incoming_demand, state) do
handle_receive_message(state)
end
@impl true
def handle_info(:receive_message, %{receive_timer: nil} = state) do
{:noreply, [], state}
end
@impl true
def handle_info(:receive_message, state) do
handle_receive_message(%{state | receive_timer: nil})
end
@impl true
def handle_info(_, state) do
{:noreply, [], state}
end
defp handle_receive_message(state) do
{has_next, contents} = MyClient.receive_message(state)
receive_timer =
case has_next do
false -> schedule_receive_message(state.receive_interval)
true -> schedule_receive_message(0)
end
{:noreply, contents, %{state | receive_timer: receive_timer}}
end
defp schedule_receive_message(interval) do
Process.send_after(self(), :receive_message, interval)
end
end
And the current configuration:
[
broadway: [
index: 0,
hibernate_after: 15000,
context: :context_not_set,
resubscribe_interval: 100,
max_seconds: 5,
max_restarts: 3,
shutdown: 30000,
name: MyConsumer,
producer: [
hibernate_after: 15000,
concurrency: 1,
module: {MyProducer,
[receive_interval: 60000]}
],
processors: [
default: [hibernate_after: 15000, max_demand: 10, concurrency: 16]
],
batchers: [
default: [
hibernate_after: 15000,
batch_timeout: 1000,
concurrency: 1,
batch_size: 1
]
]
]
]