Configure GenStages to consume dynamic demands

I use a little trick I learned from the book Mastering Elixir. In my producer, I have something like this…

  defp send_events_from_queue(queue, how_many, state) do
    tasks = queue.dequeue(how_many)

    if length(tasks) < how_many do
      Process.send_after(self(), :try_again, @queue_polling)
    end

    {:noreply, tasks, %{state | pending: how_many - length(tasks)}}
  end

# And then

  def handle_info(:try_again, %{queue: queue, pending: demand} = state) do
    send_events_from_queue(queue, demand, state)
  end

In case the queue is empty, it will poll data after @queue_polling time.

I have been using an ets queue, and a rabbitmq queue…

Any way, the situation You mention seems strange, because You would have deadlocks as soon as the number of events is not a multiple of your max demand, which I do not encounter in my pipeline.

$ iex -S mix
Erlang/OTP 21 [erts-10.1] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe]


03:45:24.293 [debug] #PID<0.172.0>: Queue worker started.
 
03:45:24.300 [debug] #PID<0.174.0>: Starter started.
 
03:45:24.305 [debug] #PID<0.175.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.176.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.177.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.178.0>: Worker started.

03:45:24.305 [debug] #PID<0.179.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.180.0>: Worker started.

03:45:24.305 [debug] #PID<0.181.0>: Worker started.

03:45:24.305 [debug] #PID<0.182.0>: Worker started.

03:45:24.305 [debug] #PID<0.183.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.184.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.185.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.186.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.187.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.188.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.189.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.190.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.191.0>: Notifier started.
iex(1)> GsPipe.multi_enqueue 777, "koko"
# lots of line
03:46:09.357 [debug] #PID<0.191.0>: Notification for {"koko", 623}

03:46:09.357 [debug] #PID<0.184.0>: Consume Task {"koko", 773}

03:46:09.357 [debug] #PID<0.191.0>: Notification for {"koko", 624}

03:46:09.357 [debug] #PID<0.184.0>: Consume Task {"koko", 774}

03:46:09.357 [debug] #PID<0.191.0>: Notification for {"koko", 625}

03:46:09.357 [debug] #PID<0.184.0>: Consume Task {"koko", 775}

03:46:09.357 [debug] #PID<0.191.0>: Notification for {"koko", 626}

03:46:09.357 [debug] #PID<0.184.0>: Consume Task {"koko", 776}

03:46:09.357 [debug] #PID<0.191.0>: Notification for {"koko", 627}

03:46:09.357 [debug] #PID<0.175.0>: Consume Task {"koko", 777}
 
03:46:09.357 [debug] #PID<0.191.0>: Notification for {"koko", 628}
# More lines
03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 767}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 768}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 769}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 770}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 771}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 772}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 773}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 774}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 775}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 776}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 777}

i put in this example 15 consumers for 1 notifier, all consumers with

subscribe_to: [{Producer, min_demand: 1, max_demand: 10}]

As I put 15 consumers for 1 notifier, the last was a bit overloaded… then i tried with 5_000_009 and it worked the same.

How does your producer handle_demand look like?

3 Likes