Integration tests, async tasks & dealing with Ecto.Sandbox errors

I have ran upon this problem several times already when building Elixir/Phoenix/Ecto applications, and I feel like I have a solution but maybe it’s not great, or maybe I am missing something out there that can be used to help me.

Basically, whenever we build a Phoenix system, which has an async bits in it, we start to experience Ecto failures like this one:

11:48:33.322 [error] Postgrex.Protocol (#PID<0.1513.0>) disconnected: ** (DBConnection.ConnectionError) owner #PID<0.3620.0> exited                                         
                                                                                                                                                                             
Client #PID<0.1614.0> is still using a connection from owner at location:                                                                                                    
                                                                                                                                                                             
    :prim_inet.recv0/3                                                                                                                                                       
    (postgrex) lib/postgrex/protocol.ex:2834: Postgrex.Protocol.msg_recv/4
    (postgrex) lib/postgrex/protocol.ex:2550: Postgrex.Protocol.recv_transaction/4
    (postgrex) lib/postgrex/protocol.ex:1855: Postgrex.Protocol.rebind_execute/4
    (ecto_sql) lib/ecto/adapters/sql/sandbox.ex:370: Ecto.Adapters.SQL.Sandbox.Connection.proxy/3

while running tests.

The thing doesn’t crash tests, but the background processes spawned manually, Tasks and GenServers do fail outputting that or similar error to the console.

Sometimes the error is different and that for example a record we created in a test set up no longer exists in database.

The tests at this stage are already running with async: false and execute sequentially one after another.

Reason for this happening

The reason why this is happening is that we have Ecto.Sandbox open a transaction at the beginning of a test, and then at the end of the test it rolls back the transaction. At that point, background tasks/processes spawned may be still trying to complete some work issued by web requests, for example.

This can happen for example with sending e-mails from a simple background Task. User registers, and confirmation e-mail is sent, test checks for the flash message being displayed to the user and exits. At the same time a background Task is spawned to send e-mail, and it attempts to access database to fetch User record. Since transaction already rolled back, User with given ID no longer is in the database, or you see the error I posted above because process that opened the transaction already exited.

The solution I am using

Since I have full control of the code base, I wrote a simple macro that I call when my background Tasks start or when my GenServer starts up, that registers given process in a Registry I use for tracking these processes in tests.

For example, in GenServer I would have:

   def init(state) do
       TestHelpers.register_gen_server(self())
   end

and in Tasks, I would have similar call to say: TestHelpers.register_task(self()).

The Registry keeps track on all of these, so it knows which processes are alive at given time.

Then, I wrap my tests that are causing trouble in a function like this:

    test "registers user in the system", %{session: session} do
      TestHelpers.wait_for_background_processes fn ->
         session |> visit("/") |> ...
      end
   end

What happens in the wait_for_background_processes/1 function is that it does three things:

  1. Executes the callback function containing the actual test
  2. Waits for all the registered Tasks to complete and processes stop being alive
  3. Waits for all the registered GenServers to empty their messages queue and change status to ‘waiting’

We do 2) this way:

  defp wait_until_genserver_idle(pid, timeout) do
    info = Process.info(pid)
    if info == nil || (info[:status] == :waiting && info[:message_queue_len] == 0) do
      :ok
    else
      :timer.sleep(10)
      wait_until_genserver_idle(pid, timeout - 10)
    end
 end
 
 defp wait_until_genserver_idle(_pid, timeout) when timeout <= 0 do
    {:error, :timeout}
 end

This works with GenServers and relatives (Supervisors, GenStage etc.) and Tasks but also normal spawned processes. I am fairly happy with the solution but I wonder if it can be improved on?

The problems I have

  1. I particularly don’t like the need to add test-specific code to my Tasks and GenServers, however this macro is a no-op when Mix.env() != :test.

  2. The other thing I dislike is that I have to wrap my code in a function. I would prefer to install on_exit handler, but it seems like I can’t easily do that, because at the time on_exit handler is executed, the original test process is already dead and transaction is being already rolled back by Ecto.Sandbox.

I don’t think I’m the only one having this issue, so I wonder how you, Elixir people, deal with similar issues or ideas how to solve 1 & 2 above? :slight_smile:

5 Likes

We have some spiritually-similar concerns in a few areas in our stack, and I’m not fully content with how we’ve tackled them so far, but it has proven somewhat effective at alleviating our previous test suite flakiness due to the performance differences between our developer hardware and our CI platform. Like you, I didn’t want to make application code changes solely for the benefit of more reliable testing.

Where we’re lucky enough that the dynamic/async work is being done under a DynamicSupervisor, we use a pattern like so, and call it during ExUnit’s setup and on_exit both:

  def wait_for_zero_workers do
    case DynamicSupervisor.which_children(The.Supervisor.Module) do
      [] ->
        :ok

      [_h | _t] ->
        Process.sleep(250)
        wait_for_zero_workers()
    end
  end

Since the on_exit is another process from the test body, this one doesn’t help with the test case yanking the plug on the Ecto Sandbox. We’d have to call this func at the end of the test case body instead. I think we did do that in a few places.

In other place, we rolled up our sleeves and did some honestly-pretty-gross tracing shenanigans, but it cleared up our test race conditions around that area pretty much immediately, and didn’t require app code changes.

  @fsm_event_mfa {Our.Async.StateMachine, :handle_event, 4}
  @recv_timeout 1_000

  defp wait_for_exit do
    pid = wait_for_init()
    assert_receive {:trace, ^pid, :out_exited, 0}, @recv_timeout
  end

  defp wait_for_init do
    assert_receive {:trace, pid, :call, {Our.Async.StateMachine, :handle_event, [:enter, :init, :init, _]}},
                   @recv_timeout

    pid
  end

  defp watch_fsm_progress(_ctx) do
    # Ensure module is preloaded before adding trace
    true = Code.ensure_loaded?(elem(@fsm_event_mfa, 0))

    :erlang.trace_pattern(
      # Watch executions of this MFA
      @fsm_event_mfa,
      [
        {
          # Assign first argument to $1
          # Assign second and third arguments to $2, both must be same value
          [:"$1", :"$2", :"$2", :_],
          # First argument must be :enter, second/third must be :init
          [{:and, {:"=:=", :"$1", :enter}, {:"=:=", :"$2", :init}}],
          # Default behavior - receive a {:trace, pid, :call, mfa} message
          []
        }
      ]
    )

    # Receive OTP messages on function calls and process exits made by new processes
    flags = [:call, :exiting]
    :erlang.trace(:new_processes, true, flags)

    # Clean up added traces at end of test case
    on_exit(fn ->
      :erlang.trace(:new_processes, false, flags)
      :erlang.trace_pattern(@fsm_event_mfa, false)
    end)

    :ok
  end

Then most test case bodies end with wait_for_exit().

For context, these are basically dynamically-spawned gen_statem state machines (not living under a DynamicSupervisor and I don’t recall why) that send and receive messages over an external message bus. For various reasons they are allowed to do more internal work after their final networked reply, so we want to let them fully execute to completion during test suites. That final work might include DB access, so the testing story was a little non-deterministic before we added this. (If you squint a little, parts of that might sound a little like Broadway or perhaps conduit_amqp, and that’s not far off conceptually, except we’re not able to leverage those examples directly due to our historical choice of bus.)

This approach has got a learning curve for our newer Elixir devs, for sure, and I was hip-deep in the Erlang docs myself when I wrote most of it. I definitely view this as a probably-too-clever band-aid that papered over a likely code smell, rather than an optimal design to strive for, but the technique might transfer to others who are interested in this topic.

OK, yeah I can see you have similar set up. At least I am not (too) crazy, right? :smiley:

Or we’re just crazy in good company :beers:

1 Like