I’m trying to properly leverage GenStage
to optimize my HTTP2 connection. HTTP2 allows for N concurrent streams (i.e., in-flight requests) negotiated between the client and server. If a server allows for 100 streams then the client may issue 100 requests. A stream will then be freed when one of the requests finishes (i.e., the HTTP response is fully consumed by the client).
My goal is to have a GenStage
producer act as a queue of HTTP requests. Then, my HTTP connections would be GenStage
consumers. These consumers would have dynamic demand based on the number of in-flight requests. For example, if the max streams for the connection is 100 and there are 20 in-flight requests then the consumer may receive AT MOST 70 requests from the producer. Then, as requests finish the consumer would receive further requests based on its capacity (MAX_STREAMS - IN_FLIGHT
).
I’m having a HARD time creating a GenStage
consumer to work like this. Either the docs aren’t thorough enough or I’m misunderstanding something So far this is the closest consumer I could get to the behaviour I want:
defmodule Consumer do
use GenStage
def start_link(opts) do
GenStage.start_link(__MODULE__, opts)
end
def init(opts) do
state = %{
queue: Queue,
from: nil,
max_in_flight: opts[:max_in_flight] || 10,
in_flight: 0
}
{:consumer, state}
end
def handle_subscribe(:producer, _opts, from, state) do
state = put_in(state.from, from)
capacity = calculate_capacity(state)
IO.inspect(capacity, label: "Capacity")
GenStage.ask(from, capacity)
{:manual, state}
end
def handle_events(requests, _from, state) do
request_count = length(requests)
state = put_in(state.in_flight, state.in_flight + request_count)
Enum.map(1..request_count, fn(id) ->
Process.send_after(self(), {:finished, id}, Enum.random(100..5000))
end)
{:noreply, [], state}
end
def handle_info({:finished, id}, state) do
IO.inspect(id, label: "Finished")
IO.inspect(state, label: "State")
state = put_in(state.in_flight, state.in_flight - 1)
capacity = calculate_capacity(state)
if capacity > 0 do
IO.inspect(capacity, label: "Capacity")
GenStage.ask(state.from, capacity)
end
{:noreply, [], state}
end
def handle_info(message, state) do
IO.inspect(message, label: "Message")
{:noreply, [], state}
end
defp calculate_capacity(state) do
state.max_in_flight - state.in_flight
end
end
However, this fails as it can easily go to in_flight > max_in_flight
. I’ve looked at several GenStage
examples and I’m failing at understanding how this could be achieved? I feel like GenStage
is a perfect fit for this
Any help would be appreciated!