Unit Testing a GenStage Producer

I’m working on my first GenStage producer, and I want to unit test it as I write the handle demand method. I’m having trouble simplifying the use of GenStage enough to write a simple test. Here’s a very simple producer that when asked for demand, outputs a list of three atoms.

defmodule OutputNonsense do
  use GenStage

  def init(arg) do
    {:producer, :some_kind_of_state}
  end

  def handle_demand(demand, state) when demand > 0 do
    {:noreply, things = [:whatever, :you, :want], state}
  end
end

When it comes to writing a test for this, this is the best I’ve been able to manage. It outputs the three atoms to the console, but never finishes execution.

defmodule OutputNonsenseTest do
  use ExUnit.Case

  test "check the results" do
    {:ok, stage} = GenStage.start_link(OutputNonsense, arg = :unused)

    GenStage.stream([stage])
    |> Stream.each(&IO.inspect/1)
    |> Stream.run()
  end
end

I realise that I’m streaming the results into IO.inspect, rather than collecting them to check in an assert. I’m really not sure how to do that. Would love any pointers.

3 Likes

I have a producer which queries tasks from a database table. I test each callback directly.

For example, my handle_demand/2 callback has the following code:

  def handle_demand(demand, state) when demand > 0 do
    limit = demand + state
    get_events(limit)
  end

  ## Helper functions.
  defp get_events(limit) do
    {:ok, {count, events}} = Task.get_waiting_tasks(limit, :requesting)
    {:noreply, events, limit - count}
  end

This is the test I wrote:

test "handle_demand updates state" do
  insert!(:task)
  {:noreply, _tasks, state} = Feeder.handle_demand(50, 100)

  assert state == 149
end

insert!/1 is a factory which inserts a task into the table, so there’s only one event. The callback asks for 50 in addition to the existing 100, but only gets one. So the resulting state is 149.

I dunno whether this is helpful.

3 Likes

Usually a create a simple consumer that just forwards every event it receives to the process that started it:

defmodule TestConsumer do
  def start_link(producer) do
    GenStage.start_link(__MODULE__, {producer, self()})
  end

  def init({producer, owner}) do
    {:consumer, owner, subscribe_to: [producer]}
  end

  def handle_events(events, _from, owner) do
    send(owner, {:received, events})
    {:noreply, [], owner}
  end
end

Then I can use it in my tests to assert about the produced events:

defmodule OutputNonsenseTest do
  use ExUnit.Case

  test "check the results" do
    {:ok, stage} = GenStage.start_link(OutputNonsense, arg = :unused)
    {:ok, _cons} = TestConsumer.start_link(stage)

    assert_receive {:received, events}
    # assertions about the events

    # The test consumer will also stop, since it is subscribed to the stage
    GenStage.stop(stage)
  end
end

I hope it helps you somehow

13 Likes

This is a good approach. Using GenStage.stream |> Enum.take is also another possibility.

6 Likes

How would that work? How can you assert on the stream?

GenStage.stream |> Enum.take will return a list, so you can make assertions on it.

2 Likes