Testing a GenServer with Ecto.Sandbox; cannot find ownership

I’ve created a GenServer that manages a queue of work, which hits the database. In my tests, I can’t seem to avoid…

** (DBConnection.OwnershipError) cannot find ownership process for #PID...

My novice-level understanding of Ecto.Sandbox is that if I’m using shared mode, I’m prevented from doing async tests (which is fine), and that async processes within a test should share the same connection by default. However, all of the examples in the docs use Task.async as opposed to a GenServer.

In my code, I can’t seem to correctly get my GenServer to use the DBConnection as it loops through the queue. If anyone’s able to help me correct my misconceptions, I’d be grateful!

test_helper.exs

ExUnit.start()
Ecto.Adapters.SQL.Sandbox.mode(Cardstock.Repo, :manual)

data_case.ex

defmodule MyApp.DataCase do
  use ExUnit.CaseTemplate

  ...

  setup tags do
    :ok = Ecto.Adapters.SQL.Sandbox.checkout(Cardstock.Repo)
    unless tags[:async] do
      Ecto.Adapters.SQL.Sandbox.mode(Cardstock.Repo, {:shared, self()})
    end
    :ok
  end
end

message_queue.ex

defmodule MyApp.MessageQueue do
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, %{}, name: :message_queue)
  end

  def init(state) do
    schedule_work()
    {:ok, state}
  end

  def add(item) do
    GenServer.cast(:message_queue, {:add, item})
  end

  def read do
    GenServer.call(:message_queue, :read)
  end

  def work do
    Process.send(self(), :work, [])
  end

  defp schedule_work() do
    Process.send_after(self(), :work, @queue_rate)
  end

  # Callbacks

  def handle_info(:work, state), do: perform_work(state)

  def handle_call(:read, _, state) do
    {:reply, state, state}
  end

  def handle_cast({:add, message}, state) do
    {:noreply, add_message_to_state(state, message)}
  end

  defp add_message_to_state(state, message) do
    # ...using Erlang queues to hang onto messages
  end

  defp perform_work(state) do
    # Doing work that preloads from the database and then writes to it...

    schedule_work()
    {:noreply, state}
  end

end

test_that_throws_errors.ex

defmodule MyApp.MessengerTest do
  use MyApp.DataCase

  test "use the GenServer to do things..." do
    # ... generate fixture data and save to database...

    # This adds an item to the queue
    {:ok, message} = MyApp.MyContext.create_and_queue_outgoing_message(%{ attrs })

    {:ok, queue} = Map.fetch(MyApp.MessageQueue.read, message.from_phone_number_id)
    assert :queue.len(queue) == 1 # works! yay!

    Process.sleep(1000) # Error gets raised during sleep as the Server 'works' again

    assert Map.fetch(MyApp.MessageQueue.read, message.from_phone_number_id) == 1
  end
end
1 Like

It is late answer, but i know reason of mistake. You child process is running after parental process (test) was finished. You need to correct test with on_exit callback, where you will process manually

1 Like

Do you have an example of doing that?

I have several webhooks I wanted to test, but I am getting the same error because they perform DB stuff in Task.start

In such cases, it is just simpler to:

  1. Do not run those particular tests in async - this way they will use a shared connection

  2. Make sure those tasks are terminated at the end of the test. This means you should start those tasks behind a Task.Supervisor (which is a good practice anyway to guarantee proper shutdown) and then tell the shutdown to terminate its children at the end of the test

4 Likes

Thanks.

So here is what I currently have, simplified:

Controller:

defmodule MyApp.WebhookController do
  use MyApp, :controller

  plug MyApp.VerifyWebhookSender

  action_fallback MyApp.FallbackController

  def handle(_conn, %{"event" => "thing.new", "account_id" => account_id, "thing" => thing_params}) do
    with {:ok, _pid} <- Task.start(fn -> ThingImport.new_thing(account_id, thing_params) end) do
      {:ok, :accepted}
    end
  end
end

Test:

describe "things" do
  setup [:create_integration]

  test "adds a thing on `thing.new`", %{conn: conn, integration: integration} do
    payload = ThingPayloads.thing_new()
    thing_params = payload["thing"]

    resp = post conn, thing_path(conn, :handle), payload

    assert json_response(resp, 202)["ok"]["detail"] == "Accepted"
    
    # Before I moved to `Task` I was checking DB stuff
    # assert thing = Things.get_thing_by!(external_id: thing["id"], integration_id: integration.id)
    # assert thing.field == thing_params["field"]
  end
end

Improve my understanding

So you mean a dynamically supervised task like this?

The link says

in the majority of cases, you want to add the task supervisor to your supervision tree

Would I actually add a Task Supervisor to my Supervision Tree? Would I name it MyApp.TaskSupervisor and have it as a generic task supervisor for my whole app, or would it be specific to this use and I’d name it like MyApp.ThingSupervisor?

And I update my controller like so?

def handle(_conn, %{"event" => "thing.new", "account_id" => account_id, "thing" => thing_params}) do
  Task.Supervisor.start_child(MyApp.ThingSupervisor, fn ->
    ThingImport.new_thing(account_id, thing_params)
  end)
  
  {:ok, :accepted}
end

Sorry, I probably should have made a separate thread at this point… :sweat_smile:

Yes for the former and up to you on the latter.

Your update seems correct. You can also create a module and have MyApp.ThingSupervisor.start_child that hides the Task.Supervisor call for you. But those are rather minor improvements to your changes. :slight_smile:

2 Likes