I’m trying to write some specs that run my GenServer workflow. The idea is just to ensure that the events from my Producer are what my Consumer expects. I’ve got things working, but I don’t want to run until my producer stream ends. I’d like to cut it off after 3 or 4 events are sent. Further, I’d like to be able to then assert that things were processed correctly.
Any suggestions on how to:
-
Set up a consumer to shut itself down after a few events are received? Seems like I should be able to call GenStage.cancel, but I need the subscription id. Seems like I should have that too, but I can’t find it.
-
Is there a better way to test these workflows? What I have is:
test “producer” do
{:ok, oregon} = GenStage.from_enumerable(Oregon.get_licensees())
{:ok, consumer} = GenStage.start_link(TestLicenseeConsumer, 2)
{:ok, subscription} = GenStage.sync_subscribe(consumer, to: oregon, min_demand: 0, max_demand: 1)
Now I want to:
a) Wait til my consumer closes itself
b) Be able to assert what happened and see the data that was received.
Process.sleep(10_000_000)
end
1 Like
Well, I guess taking a few hours off can help:
Returning:
{:stop, :normal, 0}
seems to have done it. Now I need to figure out when this happened, so that I can halt my test.
2 Likes
Found: https://semaphoreci.com/community/tutorials/introduction-to-testing-elixir-applications-with-exunit which gave me the answer.
test "producer" do
{:ok, oregon} = GenStage.from_enumerable(Oregon.get_licensees())
{:ok, consumer} = GenStage.start_link(TestLicenseeConsumer, {2, self})
GenStage.sync_subscribe(consumer, to: oregon, min_demand: 0, max_demand: 1)
assert_receive({:complete}, 5_000)
refute_received({:errors})
end
And my TestConsumer:
defmodule TestLicenseeConsumer do
alias Experimental.GenStage
alias NHVerify.Producers.Oregon
alias NHVerify.Licensee
use GenStage
def start_link({left_to_go, caller}), do: GenStage.start_link(__MODULE__, {left_to_go, caller})
def init({left_to_go, caller}), do: {:consumer, {left_to_go, caller}}
def handle_events(licensees, _from, {left_to_go, caller}) do
{valid, invalid} = licensees |> Enum.partition(&Licensee.changeset(&1).valid?)
Enum.each(invalid, fn(_) -> send caller, {:error} end)
left_to_go = left_to_go - Enum.count(valid)
case left_to_go do
0 ->
send caller, {:complete}
{:stop, :normal, {left_to_go, caller}}
left_to_go -> {:noreply, [], {left_to_go, caller}}
end
end
end
2 Likes
You don’t need to store the caller information. You can monitor it and you will receive a message when it is down:
{:ok, consumer} = GenStage.start_link(TestLicenseeConsumer, 2)
ref = Process.monitor(consumer)
GenStage.sync_subscribe(consumer, to: oregon, min_demand: 0, max_demand: 1)
assert_receive {:DOWN, ^ref, _, _, :normal}
Monitoring works for any process.
3 Likes