Configure GenStages to consume dynamic demands

I have a GenStage Producer ← Consumer to read and do something with a message from my Amazon SQS Queue, which means my Consumer asks demand when it doesn’t have anything to do, and producer simply makes a get and tries to fetch events from amazon. And it works fine for my demand right now, as the maximum of events handled by each consumer is 1. But thinking about scalability, I would like to be able to set the max_demand higher for each stage.

My first approach was to increase the max_demand to 10. But then came the caveat, according to documentation:

When implementing consumers, we often set the :max_demand and
:min_demand on subscription. The :max_demand specifies the maximum
amount of events that must be in flow while the :min_demand specifies
the minimum threshold to trigger for more demand. For example, if
:max_demand is 1000 and :min_demand is 750, the consumer will ask for
1000 events initially and ask for more only after it receives at least
250.

That means if I have only 1 event in the queue, and no other appear or takes a long time, I will have to wait until 10 is reached to process that 1 event. That is very bad for our business need and kind of weird in my opinion.

Therefore, my question is:

How can I bypass this, and set a demand maximum higher than 1, but making it process any incoming request number?

(bonus question): Why was GenStage designed this way? What’s the benefit?

Here is the abstraction I made for consuming from the producer’s events… I think the producer’s code is more simple as it just provides data when asked, but if anyone think it’s necessary I can add here:

defmodule MobileApi.GenstageWorkers.AwsSqsConsumer do
  use GenStage
  require Logger
  alias ExAws.SQS

  @ex_aws_sqs Application.get_env(
                :mobile_api,
                :ex_aws_sqs,
                ExAws
              )
  def start_link(init_args, opts \\ []) do
    GenStage.start_link(__MODULE__, init_args, opts)
  end

  def init(%{
        producer_id: producer,
        queue_name: queue_name,
        processor: processor_function,
        min_demand: min_demand,
        max_demand: max_demand
      }) do
    state = %{
      producer: producer,
      subscription: nil,
      queue: queue_name,
      processor: processor_function
    }

    GenStage.async_subscribe(
      self(),
      to: state.producer,
      min_demand: min_demand,
      max_demand: max_demand
    )

    {:consumer, state}
  end

  def handle_subscribe(:producer, _opts, from, state),
    do: {:automatic, Map.put(state, :subscription, from)}

  def handle_info(:init_ask, %{subscription: subscription} = state) do
    GenStage.ask(subscription, state.max_demand)

    {:noreply, [], state}
  end

  def handle_info(_, state), do: {:noreply, [], state}

  def handle_events(messages, _from, state) when is_nil(messages), do: {:noreply, [], state}

  def handle_events(messages, _from, state) do
    handle_messages(messages, state)
    {:noreply, [], state}
  end

  defp handle_messages(messages, state) do
    messages
    |> Enum.reduce([], &parse_message/2)
    |> process_message_batch(state.queue, state.processor)
  end

  defp parse_message(%{body: body, message_id: message_id, receipt_handle: receipt_handle}, acc) do
    case Poison.decode(body) do
      {:ok, decoded_body} ->
        [{decoded_body, %{id: message_id, receipt_handle: receipt_handle}} | acc]

      {:error, error_message} ->
        Logger.error(
          "An error has ocurred reading from queue, error message: #{inspect(error_message)} message body: #{
            inspect(body)
          }"
        )

        acc
    end
  end

  defp process_message_batch([], _, _), do: nil

  defp process_message_batch(messages_batch, queue_name, processor) do
    {bodies, metadatas} = Enum.unzip(messages_batch)
    Enum.map(bodies, fn body -> Task.start(fn -> processor.(body) end) end)

    SQS.delete_message_batch(queue_name, metadatas)
    |> @ex_aws_sqs.request
  end
end
3 Likes

You could also increase the number of producers and consumers. I use this as a replacement to spawning multiple tasks, or using something like poolboy.

That means if I have only 1 event in the queue, and no other appear or takes a long time, I will have to wait until 10 is reached to process that 1 event.

Where did You get that? If I put 1 event, it will be processed, my setup is max_demand: 10. The mechanism is not meant to create deadlocks, but to support back pressure. I have a config with 1 producer, 15 producers-consumers, 2 consumer-notifiers.

Also the other number is for triggerring request in the ~middle of your work. Not too early, not too late. This is to avoid empty consumer cycle…

Well, at least maybe this can help, (as it did for me)

The benefice I get when I use GenStage is… it does not matter how big is the load, the gen_stage adapts its work from the end of the chain, because it is a request/push system.

3 Likes

Hmmmm… I have re-watched the video and tried again a few approaches with the same code, changing the min and max demand to 0-10, 1-10, 9-10… and none seems to work as expected, in any case, when I send a single message, it gets stopped on the producer and never reached the processing in the consumer (unless it reaches 10 messages)… Only if I leave the max_demand = 1 I can expect it to be processed asap.

Of course, as you mentioned, I could use 0-1 demand configuration and scale up on consumers, spinning up more, and that’s why this code is going to production anyway, because it can be scaled this way, but I still find it strange that we have a max_demand inside the process and can’t use it to process lower quantities…

2 Likes

I use a little trick I learned from the book Mastering Elixir. In my producer, I have something like this…

  defp send_events_from_queue(queue, how_many, state) do
    tasks = queue.dequeue(how_many)

    if length(tasks) < how_many do
      Process.send_after(self(), :try_again, @queue_polling)
    end

    {:noreply, tasks, %{state | pending: how_many - length(tasks)}}
  end

# And then

  def handle_info(:try_again, %{queue: queue, pending: demand} = state) do
    send_events_from_queue(queue, demand, state)
  end

In case the queue is empty, it will poll data after @queue_polling time.

I have been using an ets queue, and a rabbitmq queue…

Any way, the situation You mention seems strange, because You would have deadlocks as soon as the number of events is not a multiple of your max demand, which I do not encounter in my pipeline.

$ iex -S mix
Erlang/OTP 21 [erts-10.1] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe]


03:45:24.293 [debug] #PID<0.172.0>: Queue worker started.
 
03:45:24.300 [debug] #PID<0.174.0>: Starter started.
 
03:45:24.305 [debug] #PID<0.175.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.176.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.177.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.178.0>: Worker started.

03:45:24.305 [debug] #PID<0.179.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.180.0>: Worker started.

03:45:24.305 [debug] #PID<0.181.0>: Worker started.

03:45:24.305 [debug] #PID<0.182.0>: Worker started.

03:45:24.305 [debug] #PID<0.183.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.184.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.185.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.186.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.187.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.188.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.189.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.190.0>: Worker started.
 
03:45:24.305 [debug] #PID<0.191.0>: Notifier started.
iex(1)> GsPipe.multi_enqueue 777, "koko"
# lots of line
03:46:09.357 [debug] #PID<0.191.0>: Notification for {"koko", 623}

03:46:09.357 [debug] #PID<0.184.0>: Consume Task {"koko", 773}

03:46:09.357 [debug] #PID<0.191.0>: Notification for {"koko", 624}

03:46:09.357 [debug] #PID<0.184.0>: Consume Task {"koko", 774}

03:46:09.357 [debug] #PID<0.191.0>: Notification for {"koko", 625}

03:46:09.357 [debug] #PID<0.184.0>: Consume Task {"koko", 775}

03:46:09.357 [debug] #PID<0.191.0>: Notification for {"koko", 626}

03:46:09.357 [debug] #PID<0.184.0>: Consume Task {"koko", 776}

03:46:09.357 [debug] #PID<0.191.0>: Notification for {"koko", 627}

03:46:09.357 [debug] #PID<0.175.0>: Consume Task {"koko", 777}
 
03:46:09.357 [debug] #PID<0.191.0>: Notification for {"koko", 628}
# More lines
03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 767}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 768}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 769}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 770}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 771}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 772}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 773}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 774}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 775}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 776}

03:46:09.360 [debug] #PID<0.191.0>: Notification for {"koko", 777}

i put in this example 15 consumers for 1 notifier, all consumers with

subscribe_to: [{Producer, min_demand: 1, max_demand: 10}]

As I put 15 consumers for 1 notifier, the last was a bit overloaded… then i tried with 5_000_009 and it worked the same.

How does your producer handle_demand look like?

3 Likes

Yes, maybe I did something wrong in the producer:

defmodule MobileApi.GenstageWorkers.AwsSqsProducer do
  use GenStage
  require Logger

  alias ExAws.SQS

  @ex_aws_sqs Application.get_env(
            :mobile_api,
            :ex_aws_sqs,
            ExAws
          )

  def start_link(queue_name, opts \\ []) do
    GenStage.start_link(__MODULE__, queue_name, opts)
  end

  def init(queue_name) do
    state = %{
      events: [],
      demand: 0,
      queue: queue_name
    }
    {:producer, state, []}
  end

  def handle_info(_, state), do: {:noreply, [], state}

  # receives a demand, adds it to the state
  # and invokes events dispatching function
  def handle_demand(incoming_demand, %{demand: demand} = state) do
    new_state = Map.put(state, :demand, demand + incoming_demand)

    dispatch_events(new_state)
  end

  # if events amount is enough to cover the demand:
  # 1. get the needed amount of events
  # 2. reset the demand
  # 3. save zero-demand and remaining events in the state
  defp dispatch_events(%{events: events, demand: demand} = state)
       when length(events) >= demand
  do
    {events_to_dispatch, remaining_events} = Enum.split(events, demand)

    new_state =
      state
      |> Map.put(:demand, 0)
      |> Map.put(:events, remaining_events)

    {:noreply, events_to_dispatch, new_state}
  end

  # if events amount doesnt cover the demand:
  # 1. fetch more events (events amount equals to the demand)
  # 2. saves demand and events in the state
  # 3. invokes dispath_events/1 again (now the demand should be covered)
  defp dispatch_events(%{events: events, demand: demand} = state)
       when length(events) < demand
  do
    events = events ++ fetch_events(demand, state.queue)

    state
    |> Map.put(:demand, demand)
    |> Map.put(:events, events)
    |> dispatch_events()
  end

  defp fetch_events(demand, queue_name) do
    aws_resp = SQS.receive_message(
      queue_name,
      max_number_of_messages: min(1, demand)
    )
    |> @ex_aws_sqs.request
    case aws_resp do
      {:ok, resp} ->
        resp.body.messages
      {:error, reason} ->
        Logger.error("An error has ocurred reading from queue, error message: #{inspect(reason)}")
        []
    end
  end
end

I followed the instructions in this article: https://medium.com/@andreichernykh/elixir-a-few-things-about-genstage-id-wish-to-knew-some-time-ago-b826ca7d48ba under “Demand Handling” topic, and for me it looks fine because it really tries to fetch events everytime consumer asks when demand is 0-1.

About your solution… I see that trick with send_after to call back and try to send more demand… But isn’t this an anti-pattern, if we consider that all demand requesting should be done by consumers for back-pressure? First because you have to rely on a scheduling that is outside of GenStage and depending on how many producers you create may cause an overhead(?), and then because if there is no consumer available you will be trying to send it uselessly, that’s why I would prefer to use producer only to generate data as consumer requests, that’s one of the objectives of my implementation.

If the queue is empty, You will need anyway to check for incoming events. Either on the consumer part, like You suggest, or in my case, the producer. Or 0-1 and scale consumers.

Choose the one that fits your needs :slight_smile:

This callback happens only when the queue is empty, and it’s ok for me to have it called periodically in case of inactivity. Also the producer is the closest to the queue…

When the pipeline is full, the back pressure is working as expected.

I see, well it’s an option, I could use demand 0-10 on consumer and have a send_after to trigger the producer if there is anything waiting there to be sent every minute or something… Or scale up consumers as much as I need the way it is… Anyway I wanted to discuss solutions as I struggled to get to this point and GenStage has a lot of caveats so I thought this could be another… Thank you for the explanations!

I did not get what is dequeue does.

It retrieves elements from the queue…

1 Like

can I have the code? Sir
I used GenStage as Producer buffer_size is 10_000 by default. my problem when I added 50_000 it producer discard the events is it the solution for my situation?
if implement the queue I added to queue 50_000 and then retrieve from queue 5000 is it possible with this solution?

I made a sample repo… but the code is old, as old as this post.

You will find dequeue in the Queue module.

2 Likes

Appreciated your help thanks you