Found: https://semaphoreci.com/community/tutorials/introduction-to-testing-elixir-applications-with-exunit which gave me the answer.
test "producer" do
{:ok, oregon} = GenStage.from_enumerable(Oregon.get_licensees())
{:ok, consumer} = GenStage.start_link(TestLicenseeConsumer, {2, self})
GenStage.sync_subscribe(consumer, to: oregon, min_demand: 0, max_demand: 1)
assert_receive({:complete}, 5_000)
refute_received({:errors})
end
And my TestConsumer:
defmodule TestLicenseeConsumer do
alias Experimental.GenStage
alias NHVerify.Producers.Oregon
alias NHVerify.Licensee
use GenStage
def start_link({left_to_go, caller}), do: GenStage.start_link(__MODULE__, {left_to_go, caller})
def init({left_to_go, caller}), do: {:consumer, {left_to_go, caller}}
def handle_events(licensees, _from, {left_to_go, caller}) do
{valid, invalid} = licensees |> Enum.partition(&Licensee.changeset(&1).valid?)
Enum.each(invalid, fn(_) -> send caller, {:error} end)
left_to_go = left_to_go - Enum.count(valid)
case left_to_go do
0 ->
send caller, {:complete}
{:stop, :normal, {left_to_go, caller}}
left_to_go -> {:noreply, [], {left_to_go, caller}}
end
end
end