I have a GenStage Producer ← Consumer to read and do something with a message from my Amazon SQS Queue, which means my Consumer asks demand when it doesn’t have anything to do, and producer simply makes a get and tries to fetch events from amazon. And it works fine for my demand right now, as the maximum of events handled by each consumer is 1. But thinking about scalability, I would like to be able to set the max_demand higher for each stage.
My first approach was to increase the max_demand to 10. But then came the caveat, according to documentation:
When implementing consumers, we often set the :max_demand and
:min_demand on subscription. The :max_demand specifies the maximum
amount of events that must be in flow while the :min_demand specifies
the minimum threshold to trigger for more demand. For example, if
:max_demand is 1000 and :min_demand is 750, the consumer will ask for
1000 events initially and ask for more only after it receives at least
250.
That means if I have only 1 event in the queue, and no other appear or takes a long time, I will have to wait until 10 is reached to process that 1 event. That is very bad for our business need and kind of weird in my opinion.
Therefore, my question is:
How can I bypass this, and set a demand maximum higher than 1, but making it process any incoming request number?
(bonus question): Why was GenStage designed this way? What’s the benefit?
Here is the abstraction I made for consuming from the producer’s events… I think the producer’s code is more simple as it just provides data when asked, but if anyone think it’s necessary I can add here:
defmodule MobileApi.GenstageWorkers.AwsSqsConsumer do
use GenStage
require Logger
alias ExAws.SQS
@ex_aws_sqs Application.get_env(
:mobile_api,
:ex_aws_sqs,
ExAws
)
def start_link(init_args, opts \\ []) do
GenStage.start_link(__MODULE__, init_args, opts)
end
def init(%{
producer_id: producer,
queue_name: queue_name,
processor: processor_function,
min_demand: min_demand,
max_demand: max_demand
}) do
state = %{
producer: producer,
subscription: nil,
queue: queue_name,
processor: processor_function
}
GenStage.async_subscribe(
self(),
to: state.producer,
min_demand: min_demand,
max_demand: max_demand
)
{:consumer, state}
end
def handle_subscribe(:producer, _opts, from, state),
do: {:automatic, Map.put(state, :subscription, from)}
def handle_info(:init_ask, %{subscription: subscription} = state) do
GenStage.ask(subscription, state.max_demand)
{:noreply, [], state}
end
def handle_info(_, state), do: {:noreply, [], state}
def handle_events(messages, _from, state) when is_nil(messages), do: {:noreply, [], state}
def handle_events(messages, _from, state) do
handle_messages(messages, state)
{:noreply, [], state}
end
defp handle_messages(messages, state) do
messages
|> Enum.reduce([], &parse_message/2)
|> process_message_batch(state.queue, state.processor)
end
defp parse_message(%{body: body, message_id: message_id, receipt_handle: receipt_handle}, acc) do
case Poison.decode(body) do
{:ok, decoded_body} ->
[{decoded_body, %{id: message_id, receipt_handle: receipt_handle}} | acc]
{:error, error_message} ->
Logger.error(
"An error has ocurred reading from queue, error message: #{inspect(error_message)} message body: #{
inspect(body)
}"
)
acc
end
end
defp process_message_batch([], _, _), do: nil
defp process_message_batch(messages_batch, queue_name, processor) do
{bodies, metadatas} = Enum.unzip(messages_batch)
Enum.map(bodies, fn body -> Task.start(fn -> processor.(body) end) end)
SQS.delete_message_batch(queue_name, metadatas)
|> @ex_aws_sqs.request
end
end