How to limit the number of concurrent running exports?

I have an endpoint in my app to download reports for orders. A report can have thousand of orders so I would not want to let the number of concurrent exports grow too much if I receive a burst of exports.
I am not sure that to use to limit the number of concurrent workers.
First solution I see is to use poolboy to queue the exports in front of the workers.
Next I studied GenStage but since the events are coming at a later time the consumer does not ask for new events afters the initial pool is empty. GenStage does not fit too well in here since I do not ask for events, they are coming outside my control at a later time.
I also tried broadway but I’m afraid that I complicate too much the initial requirement.
What is the best option here?

It’s absolutely possible to use GenStage even if events are pushed into the system, take a look at this article that goes through handling both “push” and “pull” scenarios.

https://medium.com/@andreichernykh/elixir-a-few-things-about-genstage-id-wish-to-knew-some-time-ago-b826ca7d48ba

I think maybe you should think about what you want to happen if you get too many requests at the same time. Should additional requests fail or get queued? Even if you queue up you might want to consider what happens if the queue grows too long. And if it’s important that no event is lost, you should probably look at an external queue or a library like GitHub - samsondav/rihanna: Rihanna is a high performance postgres-backed job queue for Elixir that uses some form of persistence.

3 Likes

Task.async_stream may be helpful if you want a pure stdlib solution. The max_concurrency option in particular

https://hexdocs.pm/elixir/Task.html#async_stream/3

2 Likes

The first solution I would try would be to rate limit the endpoint itself. It might be naive and kind of annoying to the consumers of the API, but it might be a simple and “good enough” approach. Shameless plug - I wrote a package some time ago that helps adding simple rate limiting to plug-based applications: plug_attack.

If you don’t want to drop requests and instead queue them, you’d need some sort of queue - there are a lot of various queuing solutions for the BEAM, just to name a few: rihanna, que, exq and possibly more

1 Like

I realized that I have not detailed the requirement in details. So I’m trying to explain it better here.
I have customers that want to export orders for their restaurants in a CSV. This needs to happen and not drop the request. Also since generating the export can take significant time it’s not required to come instantly, later via email a link to S3 download.
Each export request is stored in DB with all params so it’s not a problem if server is restarted or one export crashes, I can resume it later.
For a single takeout I need to search orders paginated in ElasticSearch and then load all page orders via GraphQL simultaneously via Taks.async_stream, process data and write to file.
So presuming I have 5 concurrent exports * 30 per page I can reach to 150 graphql requests in the same time. Since I have a cluster of two nodes this means 300. So I want to limit the number of exports per node to a certain limit.

Having this requirement I see 3 approaches I can take:

  • a pool of workers using poolboy
  • a queue like @michalmuskala suggested
  • a GenState pipeline to take new exports from the producer

@michalmuskala since the API is not public and I do not receive so many requests, I am not allowed to drop requests either the rate limiting idea does not apply although I will use it in another place.

I played with GenStage idea since I wanted to become familiar with GenStage. Thanks to @jola suggestion I managed to use GenServer even if the events are pushed in the system.

  1. First solution looks like this:
defmodule MyApplication do
  use Application

  def start(_type, _args) do
    import Supervisor.Spec, warn: false

    children = [
      worker(TakeoutProducer, []),
      worker(TakeoutConsumer, [], id: 1),
      worker(TakeoutConsumer, [], id: 2),
      worker(TakeoutConsumer, [], id: 3),
      worker(TakeoutConsumer, [], id: 4)
    ]
    opts = [strategy: :one_for_one, name: __MODULE__]
    Supervisor.start_link(children, opts)
  end
end
defmodule TakeoutProducer do
  use GenStage
  def start_link() do
    GenStage.start_link(__MODULE__, nil, name: __MODULE__)
  end

  @impl true
  def init(_), do: {:producer, []}

  def add_takeout(takeout) do
    GenStage.cast(__MODULE__, {:add_takeout, takeout})
  end

  @impl true
  def handle_demand(1, []), do: {:noreply, [], []}
  def handle_demand(1, [h | t]), do: {:noreply, [h], t}

  @impl true
  def handle_cast({:add_takeout, takeout}, state) do
    {:noreply, [takeout_bo], state}
  end
end
defmodule TakeoutConsumer do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, nil)
  end

  @impl true
  def init(state) do
    {:consumer, state, subscribe_to: [{TakeoutProducer, max_demand: 1}]}
  end

  @impl true
  def handle_info(_, state), do: {:noreply, [], state}

  @impl true
  def handle_events([takeout], _from, state) do
     # process_takeout(takeout)
    {:noreply, [], state}
  end

  defp process_takeout(_) do
    :timer.sleep(5000)
    raise "Error"
  end
end

This first solution works fine but if some consumers throw error leads to entire app crashing so I need also to put them under a customer supervisor tree and change strategy to rest_to_one maybe. It’s a bit of work to do to reach a final working solution.

Then I found out about ConsumerSupervisor.

  1. The second solution and the one I prefer now:
defmodule MyApplication do
  use Application

  def start(_type, _args) do
    import Supervisor.Spec, warn: false

    children = [
      worker(TakeoutProducer, []),
      worker(TakeoutConsumerSupervisor, [])
    ]
    opts = [strategy: :one_for_one, name: Orders.Service.Supervisor]
    Supervisor.start_link(children, opts)
  end
end
defmodule TakeoutProducer do
  use GenStage
  def start_link do
    GenStage.start_link(__MODULE__, nil, name: __MODULE__)
  end

  @impl true
  def init(_), do: {:producer, []}

  def add_takeout(takeout) do
    GenStage.cast(__MODULE__, {:add_takeout, takeout})
  end

  @impl true
  def handle_cast({:add_takeout, takeout}, state) do
    {:noreply, [takeout], state}
  end

  @impl true
  def handle_demand(_demand, []), do: {:noreply, [], []}
  def handle_demand(1, [h | t]), do: {:noreply, [h], t}

  def handle_demand(demand, state) do
    {events, state} = Enum.split(state, demand)
    {:noreply, events, state}
  end
end
defmodule Orders.Service.TakeoutConsumerSupervisor do
  use ConsumerSupervisor
  def start_link do
    ConsumerSupervisor.start_link(__MODULE__, [], name: __MODULE__)
  end
  def init(_arg) do
    children = [%{id: TakeoutConsumer, start: {TakeoutConsumer, :start_link, []}, restart: :transient}]
    opts = [
      strategy: :one_for_one,
      subscribe_to: [{TakeoutProducer, min_demand: 1, max_demand: 5}]
    ]
    ConsumerSupervisor.init(children, opts)
  end
end
defmodule TakeoutConsumer do
  def start_link(event) do
    Task.start_link(fn ->
      # TODO execute the processing of the takeout
    end)
  end
end

Works fine even if all the tasks from consumer are failing. Also the consumers are started only if there is demand.

If you’re not allowed to drop the requests, you need to persist them in some sort of durable storage - if anything somebody can trip over the power cord in the data centre, so storing it only in memory is not enough. For me that requirement eliminates GenStage (at least a naive implementation using GenStage) and means there has to be a durable queue in the middle. An implementation that builds the consumer side of a queue from GenStage would be fine.

1 Like