Integration tests, async tasks & dealing with Ecto.Sandbox errors

I have ran upon this problem several times already when building Elixir/Phoenix/Ecto applications, and I feel like I have a solution but maybe it’s not great, or maybe I am missing something out there that can be used to help me.

Basically, whenever we build a Phoenix system, which has an async bits in it, we start to experience Ecto failures like this one:

11:48:33.322 [error] Postgrex.Protocol (#PID<0.1513.0>) disconnected: ** (DBConnection.ConnectionError) owner #PID<0.3620.0> exited                                         
                                                                                                                                                                             
Client #PID<0.1614.0> is still using a connection from owner at location:                                                                                                    
                                                                                                                                                                             
    :prim_inet.recv0/3                                                                                                                                                       
    (postgrex) lib/postgrex/protocol.ex:2834: Postgrex.Protocol.msg_recv/4
    (postgrex) lib/postgrex/protocol.ex:2550: Postgrex.Protocol.recv_transaction/4
    (postgrex) lib/postgrex/protocol.ex:1855: Postgrex.Protocol.rebind_execute/4
    (ecto_sql) lib/ecto/adapters/sql/sandbox.ex:370: Ecto.Adapters.SQL.Sandbox.Connection.proxy/3

while running tests.

The thing doesn’t crash tests, but the background processes spawned manually, Tasks and GenServers do fail outputting that or similar error to the console.

Sometimes the error is different and that for example a record we created in a test set up no longer exists in database.

The tests at this stage are already running with async: false and execute sequentially one after another.

Reason for this happening

The reason why this is happening is that we have Ecto.Sandbox open a transaction at the beginning of a test, and then at the end of the test it rolls back the transaction. At that point, background tasks/processes spawned may be still trying to complete some work issued by web requests, for example.

This can happen for example with sending e-mails from a simple background Task. User registers, and confirmation e-mail is sent, test checks for the flash message being displayed to the user and exits. At the same time a background Task is spawned to send e-mail, and it attempts to access database to fetch User record. Since transaction already rolled back, User with given ID no longer is in the database, or you see the error I posted above because process that opened the transaction already exited.

The solution I am using

Since I have full control of the code base, I wrote a simple macro that I call when my background Tasks start or when my GenServer starts up, that registers given process in a Registry I use for tracking these processes in tests.

For example, in GenServer I would have:

   def init(state) do
       TestHelpers.register_gen_server(self())
   end

and in Tasks, I would have similar call to say: TestHelpers.register_task(self()).

The Registry keeps track on all of these, so it knows which processes are alive at given time.

Then, I wrap my tests that are causing trouble in a function like this:

    test "registers user in the system", %{session: session} do
      TestHelpers.wait_for_background_processes fn ->
         session |> visit("/") |> ...
      end
   end

What happens in the wait_for_background_processes/1 function is that it does three things:

  1. Executes the callback function containing the actual test
  2. Waits for all the registered Tasks to complete and processes stop being alive
  3. Waits for all the registered GenServers to empty their messages queue and change status to ‘waiting’

We do 2) this way:

  defp wait_until_genserver_idle(pid, timeout) do
    info = Process.info(pid)
    if info == nil || (info[:status] == :waiting && info[:message_queue_len] == 0) do
      :ok
    else
      :timer.sleep(10)
      wait_until_genserver_idle(pid, timeout - 10)
    end
 end
 
 defp wait_until_genserver_idle(_pid, timeout) when timeout <= 0 do
    {:error, :timeout}
 end

This works with GenServers and relatives (Supervisors, GenStage etc.) and Tasks but also normal spawned processes. I am fairly happy with the solution but I wonder if it can be improved on?

The problems I have

  1. I particularly don’t like the need to add test-specific code to my Tasks and GenServers, however this macro is a no-op when Mix.env() != :test.

  2. The other thing I dislike is that I have to wrap my code in a function. I would prefer to install on_exit handler, but it seems like I can’t easily do that, because at the time on_exit handler is executed, the original test process is already dead and transaction is being already rolled back by Ecto.Sandbox.

I don’t think I’m the only one having this issue, so I wonder how you, Elixir people, deal with similar issues or ideas how to solve 1 & 2 above? :slight_smile:

7 Likes

We have some spiritually-similar concerns in a few areas in our stack, and I’m not fully content with how we’ve tackled them so far, but it has proven somewhat effective at alleviating our previous test suite flakiness due to the performance differences between our developer hardware and our CI platform. Like you, I didn’t want to make application code changes solely for the benefit of more reliable testing.

Where we’re lucky enough that the dynamic/async work is being done under a DynamicSupervisor, we use a pattern like so, and call it during ExUnit’s setup and on_exit both:

  def wait_for_zero_workers do
    case DynamicSupervisor.which_children(The.Supervisor.Module) do
      [] ->
        :ok

      [_h | _t] ->
        Process.sleep(250)
        wait_for_zero_workers()
    end
  end

Since the on_exit is another process from the test body, this one doesn’t help with the test case yanking the plug on the Ecto Sandbox. We’d have to call this func at the end of the test case body instead. I think we did do that in a few places.

In other place, we rolled up our sleeves and did some honestly-pretty-gross tracing shenanigans, but it cleared up our test race conditions around that area pretty much immediately, and didn’t require app code changes.

  @fsm_event_mfa {Our.Async.StateMachine, :handle_event, 4}
  @recv_timeout 1_000

  defp wait_for_exit do
    pid = wait_for_init()
    assert_receive {:trace, ^pid, :out_exited, 0}, @recv_timeout
  end

  defp wait_for_init do
    assert_receive {:trace, pid, :call, {Our.Async.StateMachine, :handle_event, [:enter, :init, :init, _]}},
                   @recv_timeout

    pid
  end

  defp watch_fsm_progress(_ctx) do
    # Ensure module is preloaded before adding trace
    true = Code.ensure_loaded?(elem(@fsm_event_mfa, 0))

    :erlang.trace_pattern(
      # Watch executions of this MFA
      @fsm_event_mfa,
      [
        {
          # Assign first argument to $1
          # Assign second and third arguments to $2, both must be same value
          [:"$1", :"$2", :"$2", :_],
          # First argument must be :enter, second/third must be :init
          [{:and, {:"=:=", :"$1", :enter}, {:"=:=", :"$2", :init}}],
          # Default behavior - receive a {:trace, pid, :call, mfa} message
          []
        }
      ]
    )

    # Receive OTP messages on function calls and process exits made by new processes
    flags = [:call, :exiting]
    :erlang.trace(:new_processes, true, flags)

    # Clean up added traces at end of test case
    on_exit(fn ->
      :erlang.trace(:new_processes, false, flags)
      :erlang.trace_pattern(@fsm_event_mfa, false)
    end)

    :ok
  end

Then most test case bodies end with wait_for_exit().

For context, these are basically dynamically-spawned gen_statem state machines (not living under a DynamicSupervisor and I don’t recall why) that send and receive messages over an external message bus. For various reasons they are allowed to do more internal work after their final networked reply, so we want to let them fully execute to completion during test suites. That final work might include DB access, so the testing story was a little non-deterministic before we added this. (If you squint a little, parts of that might sound a little like Broadway or perhaps conduit_amqp, and that’s not far off conceptually, except we’re not able to leverage those examples directly due to our historical choice of bus.)

This approach has got a learning curve for our newer Elixir devs, for sure, and I was hip-deep in the Erlang docs myself when I wrote most of it. I definitely view this as a probably-too-clever band-aid that papered over a likely code smell, rather than an optimal design to strive for, but the technique might transfer to others who are interested in this topic.

OK, yeah I can see you have similar set up. At least I am not (too) crazy, right? :smiley:

Or we’re just crazy in good company :beers:

1 Like

I came across this thread today when I was troubleshooting a problem with async tasks in tests (but not an issue with the DB). In my case, it would be possible to solve the issue by setting async to false and sharing the process state, but I actually didn’t want to abandon async tests now that I’ve been accustomed to a lightning fast test suite with Elixir!

I had the thought that a potential solution to both types of problems to this issue would be to use mocks. Instead of backgrounding tasks in tests, which often isn’t necessary, we could delegate the “processization” of these tasks to a module and swap it out in tests so it runs the code synchronously. This would essentially replicate a feature that sidekiq offers allowing jobs to be run syncronously on demand, which is something I used all the time in Rails projects.

Obviously the limitation here is not having the code itself run asynchronously, but I couldn’t think of any times in the past that’s been a requirement rather than an optimization that isn’t necessary in a test run.

I am facing this kind of problem too… I think Ecto’s Sandbox is not a good solution to solve this issue. Testing async code by using database is painful and it’s not completely clear how to solve this issue correctly. Maybe someone needs to put focus on solving this with a different approach.

So I have a ton of async code and I generally don’t run into this error. Do you have a lot of GenServers? It could be a code smell that you’re not following the guidelines to “Use processes only to model runtime properties, such as mutable state, concurrency and failures, never for code organization” (https://hexdocs.pm/elixir/GenServer.html#module-when-not-to-use-a-genserver). Spawning a task will respect the $callers/$ancestors hierarchy, and find the correct task to use, so you could use that.

If in an extreme case you really do need to communicate through a system that needs to have different views of the universe, it’s possible. For example, in one system, I sharded my PubSub topics to be painted with a test_id, (by prepending the test_id) and had a registry for semi-transient gen_statems (which are modeling real world entities that need to be checked in with a periodic timeout) that would only present the test’s view of the universe to whomever needed to access it by having the key be the same test_id. I had to override the gen_statem start_link so that it paints the correct test_id.

Stateful systems can get convoluted and need care, Having gone through that IMO think the way that Ecto’s Sandbox works makes you stop and think for a second, hey, am I doing this in a way that is well-architected? And if you are, the sandbox problem will go away.

1 Like

I have a module that takes a request and creates a list of items to insert into the database at once by using Repo.insert_all. After that it calls a rabbitmq to send a message to a python module to run an artificial intelligence algorithm… when this algorithm finishes it sens back a message to elixir to update those previous registers… so it’s using database for:

1 - Insert all registers at once
2 - update all when python finishes.

Since my user does not need to wait for any of those tasks above I run them into a different process and returned back to the user super fast while those tasks above were still working… I needed to run my tests with async: false and use assert_receive with a long timeout to test those scenarios above.

Got it, so they’re more like integration tests/system tests. Yeah, you may just have to deal with not-being-async.

However, it is possible for a test to leave the VM and find its parent again. You may want to consider doing something like what hound/wallaby does. Along with your python request, send a test id (possibly in an environment variable) that lets your python package its response including a field that has the test id. Then when you handle the python response, you can bind the downstream actions back to the test that originates it, allowing it to check out the correct Sandbox.

If you want some help with this send me a pm.

1 Like

Actually it’s working. I just meant it’s not completely clear how to solve this pretty common issue. Thanks.