I have an endpoint in my app to download reports for orders. A report can have thousand of orders so I would not want to let the number of concurrent exports grow too much if I receive a burst of exports.
I am not sure that to use to limit the number of concurrent workers.
First solution I see is to use poolboy to queue the exports in front of the workers.
Next I studied GenStage but since the events are coming at a later time the consumer does not ask for new events afters the initial pool is empty. GenStage does not fit too well in here since I do not ask for events, they are coming outside my control at a later time.
I also tried broadway but I’m afraid that I complicate too much the initial requirement.
What is the best option here?
It’s absolutely possible to use GenStage even if events are pushed into the system, take a look at this article that goes through handling both “push” and “pull” scenarios.
I think maybe you should think about what you want to happen if you get too many requests at the same time. Should additional requests fail or get queued? Even if you queue up you might want to consider what happens if the queue grows too long. And if it’s important that no event is lost, you should probably look at an external queue or a library like GitHub - samsondav/rihanna: Rihanna is a high performance postgres-backed job queue for Elixir that uses some form of persistence.
Task.async_stream may be helpful if you want a pure stdlib solution. The max_concurrency option in particular
The first solution I would try would be to rate limit the endpoint itself. It might be naive and kind of annoying to the consumers of the API, but it might be a simple and “good enough” approach. Shameless plug - I wrote a package some time ago that helps adding simple rate limiting to plug-based applications: plug_attack.
If you don’t want to drop requests and instead queue them, you’d need some sort of queue - there are a lot of various queuing solutions for the BEAM, just to name a few: rihanna, que, exq and possibly more
I realized that I have not detailed the requirement in details. So I’m trying to explain it better here.
I have customers that want to export orders for their restaurants in a CSV. This needs to happen and not drop the request. Also since generating the export can take significant time it’s not required to come instantly, later via email a link to S3 download.
Each export request is stored in DB with all params so it’s not a problem if server is restarted or one export crashes, I can resume it later.
For a single takeout I need to search orders paginated in ElasticSearch and then load all page orders via GraphQL simultaneously via Taks.async_stream, process data and write to file.
So presuming I have 5 concurrent exports * 30 per page I can reach to 150 graphql requests in the same time. Since I have a cluster of two nodes this means 300. So I want to limit the number of exports per node to a certain limit.
Having this requirement I see 3 approaches I can take:
- a pool of workers using poolboy
- a queue like @michalmuskala suggested
- a GenState pipeline to take new exports from the producer
@michalmuskala since the API is not public and I do not receive so many requests, I am not allowed to drop requests either the rate limiting idea does not apply although I will use it in another place.
I played with GenStage idea since I wanted to become familiar with GenStage. Thanks to @jola suggestion I managed to use GenServer even if the events are pushed in the system.
- First solution looks like this:
defmodule MyApplication do
use Application
def start(_type, _args) do
import Supervisor.Spec, warn: false
children = [
worker(TakeoutProducer, []),
worker(TakeoutConsumer, [], id: 1),
worker(TakeoutConsumer, [], id: 2),
worker(TakeoutConsumer, [], id: 3),
worker(TakeoutConsumer, [], id: 4)
]
opts = [strategy: :one_for_one, name: __MODULE__]
Supervisor.start_link(children, opts)
end
end
defmodule TakeoutProducer do
use GenStage
def start_link() do
GenStage.start_link(__MODULE__, nil, name: __MODULE__)
end
@impl true
def init(_), do: {:producer, []}
def add_takeout(takeout) do
GenStage.cast(__MODULE__, {:add_takeout, takeout})
end
@impl true
def handle_demand(1, []), do: {:noreply, [], []}
def handle_demand(1, [h | t]), do: {:noreply, [h], t}
@impl true
def handle_cast({:add_takeout, takeout}, state) do
{:noreply, [takeout_bo], state}
end
end
defmodule TakeoutConsumer do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, nil)
end
@impl true
def init(state) do
{:consumer, state, subscribe_to: [{TakeoutProducer, max_demand: 1}]}
end
@impl true
def handle_info(_, state), do: {:noreply, [], state}
@impl true
def handle_events([takeout], _from, state) do
# process_takeout(takeout)
{:noreply, [], state}
end
defp process_takeout(_) do
:timer.sleep(5000)
raise "Error"
end
end
This first solution works fine but if some consumers throw error leads to entire app crashing so I need also to put them under a customer supervisor tree and change strategy to rest_to_one maybe. It’s a bit of work to do to reach a final working solution.
Then I found out about ConsumerSupervisor.
- The second solution and the one I prefer now:
defmodule MyApplication do
use Application
def start(_type, _args) do
import Supervisor.Spec, warn: false
children = [
worker(TakeoutProducer, []),
worker(TakeoutConsumerSupervisor, [])
]
opts = [strategy: :one_for_one, name: Orders.Service.Supervisor]
Supervisor.start_link(children, opts)
end
end
defmodule TakeoutProducer do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, nil, name: __MODULE__)
end
@impl true
def init(_), do: {:producer, []}
def add_takeout(takeout) do
GenStage.cast(__MODULE__, {:add_takeout, takeout})
end
@impl true
def handle_cast({:add_takeout, takeout}, state) do
{:noreply, [takeout], state}
end
@impl true
def handle_demand(_demand, []), do: {:noreply, [], []}
def handle_demand(1, [h | t]), do: {:noreply, [h], t}
def handle_demand(demand, state) do
{events, state} = Enum.split(state, demand)
{:noreply, events, state}
end
end
defmodule Orders.Service.TakeoutConsumerSupervisor do
use ConsumerSupervisor
def start_link do
ConsumerSupervisor.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_arg) do
children = [%{id: TakeoutConsumer, start: {TakeoutConsumer, :start_link, []}, restart: :transient}]
opts = [
strategy: :one_for_one,
subscribe_to: [{TakeoutProducer, min_demand: 1, max_demand: 5}]
]
ConsumerSupervisor.init(children, opts)
end
end
defmodule TakeoutConsumer do
def start_link(event) do
Task.start_link(fn ->
# TODO execute the processing of the takeout
end)
end
end
Works fine even if all the tasks from consumer are failing. Also the consumers are started only if there is demand.
If you’re not allowed to drop the requests, you need to persist them in some sort of durable storage - if anything somebody can trip over the power cord in the data centre, so storing it only in memory is not enough. For me that requirement eliminates GenStage (at least a naive implementation using GenStage) and means there has to be a durable queue in the middle. An implementation that builds the consumer side of a queue from GenStage would be fine.