Problem:
As a first go at using GenStage, I’m building a simple task queue. Initially it used consumers that I would add explicitly in my Application supervisor. I’ve since updated it to use a ConsumerSupervisor to add/remove Consumers as required.
The issue I’m having is that every task added to the producer gets picked up by the ProducerConsumer and passed on, but about half of them seem to disappear into thin air (ie I only see the output of IO.inspect("#{job_id} is starting")
for about half of them)!
Does anyone know why this is happening or how I can debug this further?
Thanks
Code
producer.ex
defmodule Q.Producer do
use GenStage
require Logger
def start_link(_init_args) do
GenStage.start_link(__MODULE__, {:queue.new(), 0}, name: __MODULE__)
end
@impl true
def init(initial) do
{:producer, initial}
end
@impl true
def handle_demand(demand, {backlog, existing_demand}) do
# IO.inspect("Received demand: #{demand}, existing_demand: #{existing_demand}")
case :queue.len(backlog) do
0 ->
{:noreply, [], {backlog, existing_demand + demand}}
n ->
n = min(demand, n)
{items, backlog} = :queue.split(n, backlog)
:queue.len(backlog) |> Q.Stats.set_waiting()
{:noreply, :queue.to_list(items), {backlog, existing_demand + demand - n}}
end
end
@impl true
def handle_cast({:enqueue, item}, {backlog, 0}) do
{:noreply, [], {:queue.in(item, backlog), 0}}
end
@impl true
def handle_cast({:enqueue, item}, {backlog, existing_demand}) do
IO.inspect("Received item: existing_demand: #{existing_demand}")
backlog = :queue.in(item, backlog)
{{:value, item}, backlog} = :queue.out(backlog)
{:noreply, [item], {backlog, existing_demand - 1}}
end
def enqueue(item), do: GenStage.cast(__MODULE__, {:enqueue, item})
end
producer_consumer.ex
defmodule Q.ProducerConsumer do
use GenStage
def start_link(_init_args) do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(initial) do
{:producer_consumer, initial, subscribe_to: [{Q.Producer, max_demand: 1}]}
end
def handle_events(events, _from, state) do
# producer "middleware" - do things like filter before passing on to consumer
IO.inspect("passing events to consumer: #{inspect(events)}")
{:noreply, events, state}
end
end
consumer_supervisor.ex
defmodule Q.ConsumerSupervisor do
use ConsumerSupervisor
def start_link(_args) do
{:ok, pid} = ConsumerSupervisor.start_link(__MODULE__, :ok, name: __MODULE__)
{:ok, pid}
end
def init(:ok) do
children = [
%{
id: Q.Consumer,
start: {Q.Consumer, :start_link, []},
restart: :transient
}
]
ConsumerSupervisor.init(children,
strategy: :one_for_one,
subscribe_to: [
{Q.ProducerConsumer, max_demand: 5}
]
)
end
end
consumer.ex
defmodule Q.Consumer do
alias Q.JobRecord
use GenStage
import Q.Constants
@max_job_duration max_job_duration()
def start_link(_init_args) do
GenStage.start_link(__MODULE__, :ok)
end
def init(initial) do
Process.flag(:trap_exit, true)
Q.Stats.increment_consumer_count()
{:consumer, initial, subscribe_to: [{Q.ProducerConsumer, max_demand: 1}]}
end
def handle_events(events, _from, state) do
Enum.each(events, fn job_id ->
task =
Task.async(fn ->
IO.inspect("#{job_id} is starting")
JobRecord.set_started(job_id)
run_job()
end)
case Task.yield(task, @max_job_duration) || Task.shutdown(task) do
{:ok, _result} ->
JobRecord.set_completed(job_id)
nil ->
JobRecord.retry_job(job_id)
end
end)
# As a consumer we never emit events
{:noreply, [], state}
end
def handle_info({:EXIT, _pid, reason}, state) do
Q.Stats.decrement_consumer_count()
{:stop, reason, state}
end
defp run_job do
Process.sleep(100)
end
end