Leveraging HTTP2 using a GenStage consumer for optimal stream usage?

I’m trying to properly leverage GenStage to optimize my HTTP2 connection. HTTP2 allows for N concurrent streams (i.e., in-flight requests) negotiated between the client and server. If a server allows for 100 streams then the client may issue 100 requests. A stream will then be freed when one of the requests finishes (i.e., the HTTP response is fully consumed by the client).

My goal is to have a GenStage producer act as a queue of HTTP requests. Then, my HTTP connections would be GenStage consumers. These consumers would have dynamic demand based on the number of in-flight requests. For example, if the max streams for the connection is 100 and there are 20 in-flight requests then the consumer may receive AT MOST 70 requests from the producer. Then, as requests finish the consumer would receive further requests based on its capacity (MAX_STREAMS - IN_FLIGHT).

I’m having a HARD time creating a GenStage consumer to work like this. Either the docs aren’t thorough enough or I’m misunderstanding something :slight_smile: So far this is the closest consumer I could get to the behaviour I want:

defmodule Consumer do

  use GenStage

  def start_link(opts) do
    GenStage.start_link(__MODULE__, opts)
  end

  def init(opts) do
    state = %{
      queue: Queue,
      from: nil,
      max_in_flight: opts[:max_in_flight] || 10,
      in_flight: 0
    }
    {:consumer, state}
  end

  def handle_subscribe(:producer, _opts, from, state) do
    state = put_in(state.from, from)
    capacity = calculate_capacity(state)
    IO.inspect(capacity, label: "Capacity")
    GenStage.ask(from, capacity)

    {:manual, state}
  end

  def handle_events(requests, _from, state) do
    request_count = length(requests)
    state = put_in(state.in_flight, state.in_flight + request_count)
    Enum.map(1..request_count, fn(id) ->
      Process.send_after(self(), {:finished, id}, Enum.random(100..5000))
    end)

    {:noreply, [], state}
  end

  def handle_info({:finished, id}, state) do
    IO.inspect(id, label: "Finished")
    IO.inspect(state, label: "State")
    state = put_in(state.in_flight, state.in_flight - 1)
    capacity = calculate_capacity(state)

    if capacity > 0 do
      IO.inspect(capacity, label: "Capacity")
      GenStage.ask(state.from, capacity)
    end

    {:noreply, [], state}
  end

  def handle_info(message, state) do
    IO.inspect(message, label: "Message")

    {:noreply, [], state}
  end

  defp calculate_capacity(state) do
    state.max_in_flight - state.in_flight
  end
end

However, this fails as it can easily go to in_flight > max_in_flight. I’ve looked at several GenStage examples and I’m failing at understanding how this could be achieved? I feel like GenStage is a perfect fit for this :confused:

Any help would be appreciated!

1 Like

I’m running into exactly the same problem. In my case, the server starts out with a very low max concurrent stream count (i.e. 1) and bumps it up once successful requests are made. Does anyone have any tips?

To follow up with more info: it seems that DemandDispatcher sets the max demand based on the initial ask it receives. In this case, that initial ask is 1. However, once the server increases the max concurrent stream limit and we consequently ask for a higher amount of demand, a warning is logged about potentially overloading greedy consumers.

Is this a supported use case? Is there a better way to approach this? I don’t want to set the demand higher on the consumer until we get the update from the server; I also don’t want to queue requests on the consumer’s side, as that would run the risk of them being lost if the consumer crashes.

1 Like