Cachex fallbacks vs. Ecto Sandbox process ownership

I’m working on an app that uses both Cachex and Ecto. The common pattern is to use Cachex.fetch to wrap database queries, something like

Cachex.fetch(MyCache, "key", fn key -> database_query_by(key) end)

where the value is returned from cache when available, otherwise it calls the fallback function.

This is working fine when running the app, but it’s proving difficult to test because of the subtleties around Ecto Sandbox processes. Tests fail with a long error: cannot find ownership process for #PID<0.1708.0>. The error message is thankfully very detailed, but I can’t seem to find a way to run these tests async because the only way I can get them to work is by running them in shared mode (i.e. with async: false)

The test setup that works is:

  setup tags do
    repo_pid = Sandbox.start_owner!(MyRepo, shared: not tags[:async])

    on_exit(fn ->
      Sandbox.stop_owner(repo_pid)
    end)

    :ok
  end

This ensures that the repo process is shared when the test module includes async: false. With a little snooping around, I can see that Cachex relies on GenServer.call/3 to execute the fallback function, so it’s happening in its own process. I was hoping to allow this process explicitly, e.g.

allow = Process.whereis(MyCache)
Ecto.Adapters.SQL.Sandbox.allow(MyRepo, self(), allow)

But that doesn’t work because the process identifying the cache is not the process executing these callback functions, so the above still gets the cannot find ownership process for #PID<0.1708.0> errors. I think the crux of the matter is in Cachex.Services.Courier.handle_call/3 where the fallback function is executed inside an ad-hoc spawn/1 block. It’s not a named process, so you can’t “allow” it via Ecto.Adapters.SQL.Sandbox.allow/3. You can put an IO.inspect(self()) inside a Cachex.fetch fallback function and see that the pid changes each time you run it.

It seems like Cachex + Ecto tests need to run async: false, but I noticed this post:

@benwilson512 mentions the use of the :caller option being passed to Ecto.Repo functions. However, that post is from 2019, and I don’t see mention of a :caller option anywhere in the Ecto docs.

Can anyone shed light on this?

I think :shared mode actually exists for your exact use case: when you can’t explicitly allow every process to use the checked out repo. The error probably is coming from someplace else. Consider this test case:

defmodule Share.Test do
  use Share.DataCase

  test "repo can be called from async function" do
    test = self()

    spawn(fn ->
      assert Share.Repo.query!("select 1 + 1").rows == [[2]]
      send(test, :done)
    end)

    assert_receive :done
  end
end

It passes even though the spawned process is not explicitely allowed to use the repo.

Note however, that we wait for the repo call to finish before finishing the test. Maybe the error is coming from Cachex trying to access the repo checked out for a finished test.

Here are some examples from one of my projects:

def get_user_by_session_token(token_string) do
  caller = self()

  {_, user} =
    AppCache.fetch_user_by_token(token_string, fn ->
      {:ok, query} = UserToken.verify_session_token_query(token_string)

      case Repo.one(query, caller: caller) do
        nil ->
          {:ignore, nil}

        user ->
          {:commit, user}
      end
    end)

  user
end

The key, is the Repo.one(query, caller: caller).

Here is another one:

def load_user_groups(%{groups: %Ecto.Association.NotLoaded{}} = user) do
  repo_opts = [caller: self()]

  {_, user} =
    AppCache.fetch_user_groups_by_id(user, fn ->
      user = Repo.preload(user, [:groups], repo_opts)

      {:commit, user}
    end)

  user
end

Thank you for the examples! Where are the docs for this option? I tried passing in the caller like this, but I was still getting errors, so I want to double-check that I did it correctly. Thanks!

This video should hopefully make it clear what’s going on. THE PROCESS - part 3a (The Conundrum with Concurrency) - YouTube .