GenStage: How to self-cancel a Consumer

I’m trying to write some specs that run my GenServer workflow. The idea is just to ensure that the events from my Producer are what my Consumer expects. I’ve got things working, but I don’t want to run until my producer stream ends. I’d like to cut it off after 3 or 4 events are sent. Further, I’d like to be able to then assert that things were processed correctly.

Any suggestions on how to:

  1. Set up a consumer to shut itself down after a few events are received? Seems like I should be able to call GenStage.cancel, but I need the subscription id. Seems like I should have that too, but I can’t find it.

  2. Is there a better way to test these workflows? What I have is:

    test “producer” do
    {:ok, oregon} = GenStage.from_enumerable(Oregon.get_licensees())
    {:ok, consumer} = GenStage.start_link(TestLicenseeConsumer, 2)
    {:ok, subscription} = GenStage.sync_subscribe(consumer, to: oregon, min_demand: 0, max_demand: 1)

    Now I want to:

    a) Wait til my consumer closes itself

    b) Be able to assert what happened and see the data that was received.

    Process.sleep(10_000_000)
    end

1 Like

Well, I guess taking a few hours off can help:

Returning:
{:stop, :normal, 0}

seems to have done it. Now I need to figure out when this happened, so that I can halt my test.

2 Likes

Found: https://semaphoreci.com/community/tutorials/introduction-to-testing-elixir-applications-with-exunit which gave me the answer.

  test "producer" do
    {:ok, oregon} = GenStage.from_enumerable(Oregon.get_licensees())
    {:ok, consumer} = GenStage.start_link(TestLicenseeConsumer, {2, self})
    GenStage.sync_subscribe(consumer, to: oregon, min_demand: 0, max_demand: 1)

    assert_receive({:complete}, 5_000)
    refute_received({:errors})
  end

And my TestConsumer:

defmodule TestLicenseeConsumer do
  alias Experimental.GenStage
  alias NHVerify.Producers.Oregon
  alias NHVerify.Licensee

  use GenStage
  
  def start_link({left_to_go, caller}), do: GenStage.start_link(__MODULE__, {left_to_go, caller})
  
  def init({left_to_go, caller}), do: {:consumer, {left_to_go, caller}}

  def handle_events(licensees, _from, {left_to_go, caller}) do
    {valid, invalid} = licensees |> Enum.partition(&Licensee.changeset(&1).valid?)

    Enum.each(invalid, fn(_) -> send caller, {:error} end)

    left_to_go = left_to_go - Enum.count(valid)
    case left_to_go do
      0 ->
        send caller, {:complete}
        {:stop, :normal, {left_to_go, caller}}
      left_to_go -> {:noreply, [], {left_to_go, caller}}
    end
  end
end
2 Likes

You don’t need to store the caller information. You can monitor it and you will receive a message when it is down:

{:ok, consumer} = GenStage.start_link(TestLicenseeConsumer, 2)
ref = Process.monitor(consumer)
GenStage.sync_subscribe(consumer, to: oregon, min_demand: 0, max_demand: 1)
assert_receive {:DOWN, ^ref, _, _, :normal}

Monitoring works for any process.

3 Likes