GenStage Consumer appears to be dropping events

Problem:

As a first go at using GenStage, I’m building a simple task queue. Initially it used consumers that I would add explicitly in my Application supervisor. I’ve since updated it to use a ConsumerSupervisor to add/remove Consumers as required.

The issue I’m having is that every task added to the producer gets picked up by the ProducerConsumer and passed on, but about half of them seem to disappear into thin air (ie I only see the output of IO.inspect("#{job_id} is starting") for about half of them)!

Does anyone know why this is happening or how I can debug this further?

Thanks

Code

producer.ex

defmodule Q.Producer do
  use GenStage
  require Logger

  def start_link(_init_args) do
    GenStage.start_link(__MODULE__, {:queue.new(), 0}, name: __MODULE__)
  end

  @impl true
  def init(initial) do
    {:producer, initial}
  end

  @impl true
  def handle_demand(demand, {backlog, existing_demand}) do
    # IO.inspect("Received demand: #{demand}, existing_demand: #{existing_demand}")

    case :queue.len(backlog) do
      0 ->
        {:noreply, [], {backlog, existing_demand + demand}}

      n ->
        n = min(demand, n)
        {items, backlog} = :queue.split(n, backlog)
        :queue.len(backlog) |> Q.Stats.set_waiting()
        {:noreply, :queue.to_list(items), {backlog, existing_demand + demand - n}}
    end
  end

  @impl true
  def handle_cast({:enqueue, item}, {backlog, 0}) do
    {:noreply, [], {:queue.in(item, backlog), 0}}
  end

  @impl true
  def handle_cast({:enqueue, item}, {backlog, existing_demand}) do
    IO.inspect("Received item: existing_demand: #{existing_demand}")

    backlog = :queue.in(item, backlog)
    {{:value, item}, backlog} = :queue.out(backlog)
    {:noreply, [item], {backlog, existing_demand - 1}}
  end

  def enqueue(item), do: GenStage.cast(__MODULE__, {:enqueue, item})
end

producer_consumer.ex

defmodule Q.ProducerConsumer do
  use GenStage

  def start_link(_init_args) do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(initial) do
    {:producer_consumer, initial, subscribe_to: [{Q.Producer, max_demand: 1}]}
  end

  def handle_events(events, _from, state) do
    # producer "middleware" - do things like filter before passing on to consumer
    IO.inspect("passing events to consumer: #{inspect(events)}")
    {:noreply, events, state}
  end
end

consumer_supervisor.ex

defmodule Q.ConsumerSupervisor do
  use ConsumerSupervisor

  def start_link(_args) do
    {:ok, pid} = ConsumerSupervisor.start_link(__MODULE__, :ok, name: __MODULE__)
    {:ok, pid}
  end

  def init(:ok) do
    children = [
      %{
        id: Q.Consumer,
        start: {Q.Consumer, :start_link, []},
        restart: :transient
      }
    ]

    ConsumerSupervisor.init(children,
      strategy: :one_for_one,
      subscribe_to: [
        {Q.ProducerConsumer, max_demand: 5}
      ]
    )
  end
end

consumer.ex

defmodule Q.Consumer do
  alias Q.JobRecord
  use GenStage
  import Q.Constants

  @max_job_duration max_job_duration()

  def start_link(_init_args) do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(initial) do
    Process.flag(:trap_exit, true)
    Q.Stats.increment_consumer_count()
    {:consumer, initial, subscribe_to: [{Q.ProducerConsumer, max_demand: 1}]}
  end

  def handle_events(events, _from, state) do
    Enum.each(events, fn job_id ->
      task =
        Task.async(fn ->
          IO.inspect("#{job_id} is starting")
          JobRecord.set_started(job_id)
          run_job()
        end)

      case Task.yield(task, @max_job_duration) || Task.shutdown(task) do
        {:ok, _result} ->
          JobRecord.set_completed(job_id)

        nil ->
          JobRecord.retry_job(job_id)
      end
    end)

    # As a consumer we never emit events
    {:noreply, [], state}
  end

  def handle_info({:EXIT, _pid, reason}, state) do
    Q.Stats.decrement_consumer_count()
    {:stop, reason, state}
  end

  defp run_job do
    Process.sleep(100)
  end
end
1 Like

My problem was that I was still using GenStage for the “consumer” when in fact I just needed to start a task (as the consumer supervisor was now picking up the events)

defmodule Q.Consumer do
  def start_link(job_id) do
    Task.start_link(fn ->
      # do stuff
    end)
  end
end
1 Like