GenStage: How to self-cancel a Consumer

Found: https://semaphoreci.com/community/tutorials/introduction-to-testing-elixir-applications-with-exunit which gave me the answer.

  test "producer" do
    {:ok, oregon} = GenStage.from_enumerable(Oregon.get_licensees())
    {:ok, consumer} = GenStage.start_link(TestLicenseeConsumer, {2, self})
    GenStage.sync_subscribe(consumer, to: oregon, min_demand: 0, max_demand: 1)

    assert_receive({:complete}, 5_000)
    refute_received({:errors})
  end

And my TestConsumer:

defmodule TestLicenseeConsumer do
  alias Experimental.GenStage
  alias NHVerify.Producers.Oregon
  alias NHVerify.Licensee

  use GenStage
  
  def start_link({left_to_go, caller}), do: GenStage.start_link(__MODULE__, {left_to_go, caller})
  
  def init({left_to_go, caller}), do: {:consumer, {left_to_go, caller}}

  def handle_events(licensees, _from, {left_to_go, caller}) do
    {valid, invalid} = licensees |> Enum.partition(&Licensee.changeset(&1).valid?)

    Enum.each(invalid, fn(_) -> send caller, {:error} end)

    left_to_go = left_to_go - Enum.count(valid)
    case left_to_go do
      0 ->
        send caller, {:complete}
        {:stop, :normal, {left_to_go, caller}}
      left_to_go -> {:noreply, [], {left_to_go, caller}}
    end
  end
end
2 Likes