GenStage Integration testing

hi!

So I have a simple flow with A -> B -> C like the example.

I am trying to see if I can test the full flow in the Case as a unit test/integration test.

I understand that I can test the result of C to see if my flow worked. But it is a bit more complicated than that . In fact the main issue I have is how do I know the job is fully completed through the chain?

I tried

  • method 1: try to do async using the cast/handle_cast . But then I don t know when it s completed

  • method 2: I can us the call/handle_call mechanism… but then when my message gets to C currently, no reply is sent to the original test process (where I run my test from)… I could pass the original from all the way down with my messages but that seems wrong to do as well…

So what am I missing here? Again , I want to be able to run from a unittest

1 Like

In your integration test, you would test that the job is processed correctly through the entire pipeline when the thing C does happens. If that’s inserting into a database, then test that the record was created.

My setup would be to subscribe a fake consumer to each stage and just emit the events it receives to the test process:

defmodule EchoConsumer do
  use GenStage

  def start_link(producer, test) do
    GenStage.start_link(__MODULE__, {producer, test})
  end

  def init({producer, test}) do
    {:consumer, test, subscribe_to: [producer]}
  end

  def handle_events(events, _, test) do
    for event <- events, do: send(test, {:received, event})
  end
end

test "A produces events" do
  {:ok, _} = StageA.start_link()
  {:ok, _} = EchoConsumer.start_link(StageA, self())
  StageA.produce_event(1)
  assert_receive {:event, 1}
end

Thanks for the reply! So I get the idea but I am stuck with the implementation details. so if I have A -> B -> C

there are quite a few scenarios I am struggling with

  1. what does your produce_event(1) do? does it call GenStage.call? In order to wait in a test , I need it to be synchronous. But that means the A Stage needs to not reply until C is done. Which Means somehow the decision has to be handled of replying in C

  2. what if one of your stage filters (like say in B the jobs events get filtered and you can end up sending nothing downstream)?

The problem I have is that GenStage.call is synchronous and fails due to timeout when I try to inject a message that way.

First, produce_event() is a stand in for sending an event down the pipeline. Are you polling Redis? Do you pull tasks from a db? Second, it sounds like you’re trying to test an event/reply pipeline like A -> B -> C -> B -> A? GenStage kind of handles those semantics for you. If it is imperative that A knows when C completes for some other reason than A produces more work, I would not use GenStage in this situation. Otherwise, GenStage is a well tested library and I would just assume GenStage is working the way its supposed to.

In this situation, I would just test B in isolation… test that I don’t get events downstream that should be filtered out. In an integration test, I would test the expected outcome does not happen because it was filtered out and some point in the pipeline.

I do not use Redis or task from DB. This will eventually be more a RAbbitMQ or Kafka. But for now I don t have anything. I was using the A.call to force generating events. But this is a synchronous call and wait for a reply

What do you mean by produce_event is a stand? is it part of GenStage or just your own method to inject events into your pipeline.?

So since you poll , when do you know your events are done processing so you can verify your test that was you expected in your flow happened?

I don t need to know something was processed in my normal processing. I just need to know that it is processed while running integration test so I can verify at the correct time the results.

I mean produce_event is whatever you want it to be. It’s a stub. Also, I know my events are done processing because of the EchoConsumer and assert_receive combo from above.

What is C doing with these messages?

so the assert_receive waits in this case? I can create way to inject events using a poller. Just want to make sure there are no race conditions

ok ! I think I have an idea with the assert receive as you showed. my last stage can send sepcific events for my test, and my last stage had configurable options, so in test mode I ll pass it a special process (the test process), and I will be able to notify the test process that I recevied and event. I just tried and seems to work!

thanks!

1 Like