I use a little trick I learned from the book Mastering Elixir. In my producer, I have something like this…
defp send_events_from_queue(queue, how_many, state) do
tasks = queue.dequeue(how_many)
if length(tasks) < how_many do
Process.send_after(self(), :try_again, @queue_polling)
end
{:noreply, tasks, %{state | pending: how_many - length(tasks)}}
end
# And then
def handle_info(:try_again, %{queue: queue, pending: demand} = state) do
send_events_from_queue(queue, demand, state)
end
In case the queue is empty, it will poll data after @queue_polling time.
I have been using an ets queue, and a rabbitmq queue…
Any way, the situation You mention seems strange, because You would have deadlocks as soon as the number of events is not a multiple of your max demand, which I do not encounter in my pipeline.
$ iex -S mix
Erlang/OTP 21 [erts-10.1] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe]
03:45:24.293 [debug] #PID<0.172.0>: Queue worker started.
03:45:24.300 [debug] #PID<0.174.0>: Starter started.
03:45:24.305 [debug] #PID<0.175.0>: Worker started.
03:45:24.305 [debug] #PID<0.176.0>: Worker started.
03:45:24.305 [debug] #PID<0.177.0>: Worker started.
03:45:24.305 [debug] #PID<0.178.0>: Worker started.
03:45:24.305 [debug] #PID<0.179.0>: Worker started.
03:45:24.305 [debug] #PID<0.180.0>: Worker started.
03:45:24.305 [debug] #PID<0.181.0>: Worker started.
03:45:24.305 [debug] #PID<0.182.0>: Worker started.
03:45:24.305 [debug] #PID<0.183.0>: Worker started.
03:45:24.305 [debug] #PID<0.184.0>: Worker started.
03:45:24.305 [debug] #PID<0.185.0>: Worker started.
03:45:24.305 [debug] #PID<0.186.0>: Worker started.
03:45:24.305 [debug] #PID<0.187.0>: Worker started.
03:45:24.305 [debug] #PID<0.188.0>: Worker started.
03:45:24.305 [debug] #PID<0.189.0>: Worker started.
03:45:24.305 [debug] #PID<0.190.0>: Worker started.
03:45:24.305 [debug] #PID<0.191.0>: Notifier started.
iex(1)> GsPipe.multi_enqueue 777, "koko"
# lots of line
03:46:09.357 [debug] #PID<0.191.0>: Notification for {"koko", 623}
03:46:09.357 [debug] #PID<0.184.0>: Consume Task {"koko", 773}
03:46:09.357 [debug] #PID<0.191.0>: Notification for {"koko", 624}
03:46:09.357 [debug] #PID<0.184.0>: Consume Task {"koko", 774}
03:46:09.357 [debug] #PID<0.191.0>: Notification for {"koko", 625}
03:46:09.357 [debug] #PID<0.184.0>: Consume Task {"koko", 775}
03:46:09.357 [debug] #PID<0.191.0>: Notification for {"koko", 626}
03:46:09.357 [debug] #PID<0.184.0>: Consume Task {"koko", 776}
03:46:09.357 [debug] #PID<0.191.0>: Notification for {"koko", 627}
03:46:09.357 [debug] #PID<0.175.0>: Consume Task {"koko", 777}
03:46:09.357 [debug] #PID<0.191.0>: Notification for {"koko", 628}
# More lines
03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 767}
03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 768}
03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 769}
03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 770}
03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 771}
03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 772}
03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 773}
03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 774}
03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 775}
03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 776}
03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 777}
i put in this example 15 consumers for 1 notifier, all consumers with
subscribe_to: [{Producer, min_demand: 1, max_demand: 10}]
As I put 15 consumers for 1 notifier, the last was a bit overloaded… then i tried with 5_000_009 and it worked the same.
How does your producer handle_demand look like?