Broadway Message Distribution Strategy

Here is my system loaded up with work:

It seems Broadway is favoring several workers and giving them lots of messages to process (when I have max_demand: 1). Others are getting no messages, so turning up the number of workers has no effect on throughput. CPU & Network never hit maximum, the system just processes slowly.

Does Broadway use a round-robin message distribution strategy? Can I change it? How would I ensure each worker only ever has 1 message to work on at a time?

It uses the default demand dispatcher from GenStage. Can you please post your topology definition? A high number of workers and low max_demand should be enough to make it process one by one. Also, you may be running into the issue that the producer cannot produce fast enough, so there is not enough work for all cores. More information would be appreciated.

2 Likes

Here is the broadway setup I tried in the above screenshot. Previously I tried 200 workers with the same effect. I can try 20k but I definitely don’t want to be processing that many files at a time. The files size range unfortunately from a few kb to 200-300 MB so their weights are probably not being respected.

    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producers: [
        default: [
          module:
            {BroadwaySQS.Producer,
             queue_name: "my-pipeline-prototype",
             max_number_of_messages: 10,
             wait_time_seconds: 20,
             visibility_timeout: 300,
             receive_interval: 10},
          stages: 50
        ]
      ],
      processors: [
        # downloaders: [
        #   max_demand: 1,
        #   stages: System.schedulers_online()
        # ],
        default: [
          max_demand: 1,
          # + div(System.schedulers_online(), 2)
          stages: 2000
        ]
      ],
      batchers: [
        default: [
          batch_size: 10,
          batch_timeout: 10_000,
          stages: 50
        ]
      ]
    )

There is more information in my last post:

1 Like

Yeah, I think the issue is that we can’t get that much data that fast. Try starting two instances of the same broadway (with different names) and see if you can max it out this way.

1 Like

Actually, ignore me, Broadway.SQS uses one poller per producer stage.

1 Like

Starting multiple instances doesn’t help anything. It just ends up wasting a lot of CPU/Network resources. I’ve already tried running with up to 3 of these instances.

Why would a processor ask for more work when it has 40+ messages in its message queue already? Are messages getting distributed to the worker when they did not ask for it?

A processor should not receive more than max_demand, if it is doing so, it is a bug. Add an IO.inspect(length(events)) here and if you receive more than 1 for max_demand == 1, then it is a bug. Maybe we are not passing the max_demand forward?

EDIT: added link.

1 Like

Do you mean messages?

1 Like

Yes, apparently I can’t internet today.

2 Likes

The value is always 1 but the PID is almost always the same few PIDs. Remember, there are 2000 processors here waiting around for work, but look at which PIDs are actually working:

2019-07-24T02:56:51.756000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T02:56:51.756000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:56:52.259000 [warn] Broadway Producer (#PID<0.546.0>) received 1 messages

2019-07-24T02:56:52.876000 [warn] Broadway Producer (#PID<0.547.0>) received 1 messages

2019-07-24T02:56:56.760000 [warn] Broadway Producer (#PID<0.547.0>) received 1 messages

2019-07-24T02:56:59.487000 [warn] Broadway Producer (#PID<0.547.0>) received 1 messages

2019-07-24T02:57:10.134000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:57:10.374000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:57:10.515000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:57:10.745000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:57:10.898000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:57:12.190000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:57:12.141000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:57:14.315000 [warn] Broadway Producer (#PID<0.546.0>) received 1 messages

2019-07-24T02:57:14.934000 [warn] Broadway Producer (#PID<0.546.0>) received 1 messages

2019-07-24T02:57:15.429000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T02:57:15.685000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T02:57:16.833000 [warn] Broadway Producer (#PID<0.546.0>) received 1 messages

2019-07-24T02:57:19.277000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T02:57:25.162000 [warn] Broadway Producer (#PID<0.546.0>) received 1 messages

2019-07-24T02:57:27.335000 [warn] Broadway Producer (#PID<0.546.0>) received 1 messages

2019-07-24T02:57:29.114000 [warn] Broadway Producer (#PID<0.546.0>) received 1 messages

2019-07-24T02:57:30.339000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:57:30.792000 [warn] Broadway Producer (#PID<0.546.0>) received 1 messages

2019-07-24T02:57:30.953000 [warn] Broadway Producer (#PID<0.546.0>) received 1 messages

2019-07-24T02:57:35.670000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T02:57:37.250000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T02:57:37.514000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:57:38.204000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T02:57:39.228000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:57:39.706000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T02:57:39.892000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T02:57:40.836000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:57:41.252000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T02:57:41.940000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:57:44.214000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T02:57:45.612000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T02:57:55.786000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:57:57.739000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:57:58.921000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:58:00.790000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:58:00.938000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T02:58:01.430000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T02:58:01.178000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:58:02.589000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T02:58:03.206000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:58:04.298000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T02:58:05.262000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:58:07.560000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:58:10.996000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T02:58:16.141000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T02:58:31.597000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

Heres a screenshot from observer while this is ongoing (sorted by MsgQ desc). There are only a few processors which get lots of messages:

When running again and sorting by Reds to see which processes did the most work, it looks a bit lopsided as well.

Any ideas?

So I tried with only big files (300-400 files queued up >= 100mb, running from my laptop w/ wifi) to see if smaller files was causing an issue and it seems not to be the case.

2019-07-24T03:22:58.852000 [warn] Broadway Producer (#PID<0.545.0>) received 1 messages

2019-07-24T03:22:58.852000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T03:22:59.520000 [warn] Broadway Producer (#PID<0.546.0>) received 1 messages

2019-07-24T03:23:00.480000 [info] (#PID<0.545.0>) Begin downloading via curl, 302490264 bytes

2019-07-24T03:23:00.480000 [info] (#PID<0.544.0>) Begin downloading via curl, 173432832 bytes

2019-07-24T03:23:00.480000 [info] (#PID<0.546.0>) Begin downloading via curl, 267965903 bytes

2019-07-24T03:23:02.594000 [warn] Broadway Producer (#PID<0.547.0>) received 1 messages

2019-07-24T03:23:02.594000 [info] (#PID<0.547.0>) Begin downloading via curl, 175853568 bytes

2019-07-24T03:23:04.415000 [warn] Broadway Producer (#PID<0.548.0>) received 1 messages

2019-07-24T03:23:04.415000 [info] (#PID<0.548.0>) Begin downloading via curl, 273748300 bytes

2019-07-24T03:23:06.648000 [warn] Broadway Producer (#PID<0.549.0>) received 1 messages

2019-07-24T03:23:06.648000 [info] (#PID<0.549.0>) Begin downloading via curl, 179757056 bytes

2019-07-24T03:23:10.412000 [warn] Broadway Producer (#PID<0.551.0>) received 1 messages

2019-07-24T03:23:10.412000 [info] (#PID<0.551.0>) Begin downloading via curl, 198276268 bytes

2019-07-24T03:23:11.582000 [warn] Broadway Producer (#PID<0.550.0>) received 1 messages

2019-07-24T03:23:11.582000 [info] (#PID<0.550.0>) Begin downloading via curl, 278604105 bytes

2019-07-24T03:23:11.582000 [warn] Broadway Producer (#PID<0.552.0>) received 1 messages

2019-07-24T03:23:11.582000 [info] (#PID<0.552.0>) Begin downloading via curl, 287281012 bytes

2019-07-24T03:23:12.409000 [warn] Broadway Producer (#PID<0.553.0>) received 1 messages

2019-07-24T03:23:12.409000 [info] (#PID<0.553.0>) Begin downloading via curl, 200230071 bytes

2019-07-24T03:23:13.285000 [warn] Broadway Producer (#PID<0.554.0>) received 1 messages

2019-07-24T03:23:13.285000 [info] (#PID<0.554.0>) Begin downloading via curl, 204331743 bytes

2019-07-24T03:23:14.558000 [warn] Broadway Producer (#PID<0.555.0>) received 1 messages

2019-07-24T03:23:14.558000 [info] (#PID<0.555.0>) Begin downloading via curl, 112947200 bytes

2019-07-24T03:23:14.942000 [warn] Broadway Producer (#PID<0.556.0>) received 1 messages

2019-07-24T03:23:14.942000 [info] (#PID<0.556.0>) Begin downloading via curl, 289457789 bytes

2019-07-24T03:23:25.440000 [info] {"atom_memory":1057985,"binary_memory":43145840,"code_memory":17526429,"ets_memory":7952160,"metrics":"health","process_memory":683837488,"system_memory":84343400,"total_memory":768180888}

2019-07-24T03:24:06.857000 [info] (#PID<0.555.0>) Finish downloading in 52.294123 seconds (112947200 bytes)

2019-07-24T03:24:06.857000 [info] (#PID<0.555.0>) Begin processing 112947200 bytes

2019-07-24T03:24:11.355000 [info] (#PID<0.555.0>) Finish processing in 4.497762 seconds (112947200 bytes)

2019-07-24T03:24:11.360000 [warn] Broadway Producer (#PID<0.555.0>) received 1 messages

2019-07-24T03:24:11.360000 [info] (#PID<0.555.0>) Begin downloading via curl, 211469787 bytes

2019-07-24T03:24:21.363000 [info] Delete 1 messages from SQS queue

2019-07-24T03:24:25.440000 [info] {"atom_memory":1057985,"binary_memory":44927608,"code_memory":17560940,"ets_memory":7961552,"metrics":"health","process_memory":684351832,"system_memory":86185056,"total_memory":770536888}

2019-07-24T03:24:30.100000 [info] (#PID<0.544.0>) Finish downloading in 90.051605 seconds (173432832 bytes)

2019-07-24T03:24:30.100000 [info] (#PID<0.544.0>) Begin processing 173432832 bytes

2019-07-24T03:24:36.463000 [info] (#PID<0.544.0>) Finish processing in 6.362783 seconds (173432832 bytes)

2019-07-24T03:24:36.463000 [warn] Broadway Producer (#PID<0.544.0>) received 1 messages

2019-07-24T03:24:36.463000 [info] (#PID<0.544.0>) Begin downloading via curl, 303573205 bytes

2019-07-24T03:24:46.464000 [info] Delete 1 messages from SQS queue

The odd thing is the logs there look all fine since no one worker is actually working more than one file at a time, but when looking in observer, a bunch of messages are in the mailbox for a few processes still. When looking at one of the workers with a lot of the messages, several of the messages (maybe half) look like this:

{'$gen_consumer',
    {<0.516.0>,#Ref<0.352140224.506462210.91230>},
    [#{'__struct__' => 'Elixir.Broadway.Message',
       acknowledger =>
           {'Elixir.BroadwaySQS.ExAwsClient',
               #Ref<0.352140224.506462209.83744>,
               #{receipt =>
                     #{id => <<"...">>,
                       receipt_handle =>
                           <<"...">>}}},
       batch_key => default,batcher => default,
       data =>
           <<"{\"receivedTime\":\"2019-07-24T03:29:18.668259Z\",\"presignedUrl\":\"https://...\",\"fileSize\":202984451}">>,
       metadata =>
           #{attributes => [],
             md5_of_body => <<"...">>,
             message_attributes => [],
             message_id => <<"...">>,
             receipt_handle =>
                 <<"...">>},
       status => ok}]}

One message like this: (could this one be a problem? why is it asking for 5? maybe this is the batcher?)

{'$gen_producer',{<0.2648.0>,#Ref<0.352140224.506462209.111402>},{ask,5}}

And a few like this: (none of these look to be that interesting…)

{'EXIT',#Port<0.69>,normal}
{system,{<0.8279.0>,#Ref<0.352140224.506462210.165688>},get_status}

Good news, I found a workaround. It appears reducing to a single producer makes the issue go away. I’m still having trouble balancing CPU and networking but I no longer have single workers that are hoarding messages (and therefore causing them to go to the DLQ).

Is it possible that when the first worker asks the producer for demand that multiple producers respond with a single message which creates the bottleneck on a single worker if they cannot get through the message fast enough?

Has there been any traction on this? Is this enough info to submit an issue or are there other things I can try in order to run multiple producers?

1 Like

Apologies, I only check the forum a couple times per week. Let’s move this to a Broadway issue and discuss there. But one issue is for sure: if you have multiple producers, the first processor would subscribe to all of them and be the first in the list to receive messages from all of them. The minimum we can do here is to shuffle the list of subscription. I have some ideas to try out, please open up an issue on Broadway and we can continue the convo there!

2 Likes