I’m working on my first GenStage producer, and I want to unit test it as I write the handle demand method. I’m having trouble simplifying the use of GenStage enough to write a simple test. Here’s a very simple producer that when asked for demand, outputs a list of three atoms.
defmodule OutputNonsense do
use GenStage
def init(arg) do
{:producer, :some_kind_of_state}
end
def handle_demand(demand, state) when demand > 0 do
{:noreply, things = [:whatever, :you, :want], state}
end
end
When it comes to writing a test for this, this is the best I’ve been able to manage. It outputs the three atoms to the console, but never finishes execution.
defmodule OutputNonsenseTest do
use ExUnit.Case
test "check the results" do
{:ok, stage} = GenStage.start_link(OutputNonsense, arg = :unused)
GenStage.stream([stage])
|> Stream.each(&IO.inspect/1)
|> Stream.run()
end
end
I realise that I’m streaming the results into IO.inspect, rather than collecting them to check in an assert. I’m really not sure how to do that. Would love any pointers.
I have a producer which queries tasks from a database table. I test each callback directly.
For example, my handle_demand/2 callback has the following code:
def handle_demand(demand, state) when demand > 0 do
limit = demand + state
get_events(limit)
end
## Helper functions.
defp get_events(limit) do
{:ok, {count, events}} = Task.get_waiting_tasks(limit, :requesting)
{:noreply, events, limit - count}
end
This is the test I wrote:
test "handle_demand updates state" do
insert!(:task)
{:noreply, _tasks, state} = Feeder.handle_demand(50, 100)
assert state == 149
end
insert!/1 is a factory which inserts a task into the table, so there’s only one event. The callback asks for 50 in addition to the existing 100, but only gets one. So the resulting state is 149.
Usually a create a simple consumer that just forwards every event it receives to the process that started it:
defmodule TestConsumer do
def start_link(producer) do
GenStage.start_link(__MODULE__, {producer, self()})
end
def init({producer, owner}) do
{:consumer, owner, subscribe_to: [producer]}
end
def handle_events(events, _from, owner) do
send(owner, {:received, events})
{:noreply, [], owner}
end
end
Then I can use it in my tests to assert about the produced events:
defmodule OutputNonsenseTest do
use ExUnit.Case
test "check the results" do
{:ok, stage} = GenStage.start_link(OutputNonsense, arg = :unused)
{:ok, _cons} = TestConsumer.start_link(stage)
assert_receive {:received, events}
# assertions about the events
# The test consumer will also stop, since it is subscribed to the stage
GenStage.stop(stage)
end
end