I’ve created a GenServer that manages a queue of work, which hits the database. In my tests, I can’t seem to avoid…
** (DBConnection.OwnershipError) cannot find ownership process for #PID...
My novice-level understanding of Ecto.Sandbox
is that if I’m using shared mode, I’m prevented from doing async tests (which is fine), and that async processes within a test should share the same connection by default. However, all of the examples in the docs use Task.async
as opposed to a GenServer.
In my code, I can’t seem to correctly get my GenServer to use the DBConnection as it loops through the queue. If anyone’s able to help me correct my misconceptions, I’d be grateful!
test_helper.exs
ExUnit.start()
Ecto.Adapters.SQL.Sandbox.mode(Cardstock.Repo, :manual)
data_case.ex
defmodule MyApp.DataCase do
use ExUnit.CaseTemplate
...
setup tags do
:ok = Ecto.Adapters.SQL.Sandbox.checkout(Cardstock.Repo)
unless tags[:async] do
Ecto.Adapters.SQL.Sandbox.mode(Cardstock.Repo, {:shared, self()})
end
:ok
end
end
message_queue.ex
defmodule MyApp.MessageQueue do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, %{}, name: :message_queue)
end
def init(state) do
schedule_work()
{:ok, state}
end
def add(item) do
GenServer.cast(:message_queue, {:add, item})
end
def read do
GenServer.call(:message_queue, :read)
end
def work do
Process.send(self(), :work, [])
end
defp schedule_work() do
Process.send_after(self(), :work, @queue_rate)
end
# Callbacks
def handle_info(:work, state), do: perform_work(state)
def handle_call(:read, _, state) do
{:reply, state, state}
end
def handle_cast({:add, message}, state) do
{:noreply, add_message_to_state(state, message)}
end
defp add_message_to_state(state, message) do
# ...using Erlang queues to hang onto messages
end
defp perform_work(state) do
# Doing work that preloads from the database and then writes to it...
schedule_work()
{:noreply, state}
end
end
test_that_throws_errors.ex
defmodule MyApp.MessengerTest do
use MyApp.DataCase
test "use the GenServer to do things..." do
# ... generate fixture data and save to database...
# This adds an item to the queue
{:ok, message} = MyApp.MyContext.create_and_queue_outgoing_message(%{ attrs })
{:ok, queue} = Map.fetch(MyApp.MessageQueue.read, message.from_phone_number_id)
assert :queue.len(queue) == 1 # works! yay!
Process.sleep(1000) # Error gets raised during sleep as the Server 'works' again
assert Map.fetch(MyApp.MessageQueue.read, message.from_phone_number_id) == 1
end
end