How to test the GenStage Consumer?

Hi guys,
I found some resources on how to test the producer, however there is nothing I could find which shows how to test the Consumer.

In producer, I create a dummy consumer and everything works fine, however in consumer I am struggling with testing.

defmodule DataProducer do
  use GenStage

  def start_link([]) do
    GenStage.start_link(__MODULE__, 0, name: __MODULE__)
  end

  # {:queue.new, demand, size}
  def init(counter) do
    {:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
  end

  def handle_demand(demand, state) do 
    events = Enum.to_list(state..state + demand + 1)
    # Logger.info "demand is: #{inspect(demand)}, state is #{inspect(state)}"
    {:noreply, events, (state + demand)}
  end
end

Producer Test:

defmodule DataProducerTest do
  use ExUnit.Case

  test "check the results" do
    {:ok, stage} = DataProducer.start_link([])
    {:ok, _cons} = TestConsumer.start_link(stage)
    assert_receive {:received, events}
    GenStage.stop(stage)
  end

end

defmodule TestConsumer do
  def start_link(producer) do
    GenStage.start_link(__MODULE__, {producer, self()})
  end
  def init({producer, owner}) do
    {:consumer, owner, subscribe_to: [producer]}
  end
  def handle_events(events, _from, owner) do
    send(owner, {:received, events})
    {:noreply, [], owner}
  end
end

And consumer:

defmodule DataConsumer do
  use GenStage
  def start_link([]) do
    GenStage.start_link(__MODULE__, :any_state)
  end
  def init(state) do
    {:consumer, state, subscribe_to: [{DataProducer, selector: fn n -> n > 50 && n < 100 end, max_demand: 10}]}
  end
  def handle_events(events, _from, state) do
    for event <- events do
      # :timer.sleep(250)
      Logger.info inspect( {self(), event, state} )
    end
    {:noreply, [], state}
  end
end

Thank you in advanced.

2 Likes