Currently, I’m implementing a producer Client
for receiving events which would call remote API.
I wrote a consumer Limiter. It’s supposed to take maximum limited batch of events allowed at certain period of time.
But sometimes, I received all events from producer
to Limiter.handle_events
without any limits.
That’s because there is no restriction of max events DemandDispatcher.
I wrote my own dispatcher for that case, but I had the feeling that I was using wrong GenStage
for that case.
Could somebody help me to do it better?
defmodule Client do
def write(message) do
GenStage.cast(__MODULE__, {:write, message})
end
def init(:ok) do
{:producer, :ok}
end
def handle_cast({:write, message}, state) do
{:noreply, [{:write, message}], state}
end
end
defmodule Limiter do
defstruct [:max_demand, :max_interval, :producer_from, :scheduled_at, :get_events]
def init(options) do
state = %__MODULE__{
max_demand: options[:max_demand] || 5,
max_interval: options[:max_interval] || 5_000,
get_events: 0,
scheduled_at: nil
}
{:consumer, state, subscribe_to: [{GSS.Client, subscribe_options(state)}]}
end
def handle_events(events, from, state) do
state = state
|> Map.update!(:get_events, &(&1 + length(events)))
|> ask_and_schedule()
# process events
{:noreply, [], state}
end
def handle_info({:ask, from}, state) do
{:noreply, [], ask_and_schedule(state)}
end
defp subscribe_options(state) do
[max_demand: state.max_demand, min_demand: 1]
end
defp ask_and_schedule(state) do
now = :erlang.timestamp()
interval_expired? = state.scheduled_at != nil && :timer.now_diff(now, state.scheduled_at) <= state.max_interval * 1000
limited_events? = state.get_events >= state.max_demand
cond do
interval_expired? && !limited_events? ->
GenStage.ask(state.producer_from, state.max_demand)
%{state | get_events: 0, scheduled_at: nil}
!interval_expired? && !limited_events? ->
GenStage.ask(state.producer_from, state.max_demand)
if state.scheduled_at == nil,
do: Map.put(state, :scheduled_at, now),
else: state
interval_expired? && limited_events? ->
Process.send_after(self(), {:ask, state.producer_from}, state.max_interval)
%{state | get_events: 0, scheduled_at: nil}
!interval_expired? && limited_events? ->
Process.send_after(self(), {:ask, state.producer_from}, state.max_interval)
%{state | get_events: 0, scheduled_at: nil}
end
end
end