In my project I’m using Quantum with a custom storage I’ve written using Ecto (which is based on GenServer).
Under the hood, Quantum scheduler (Foo.Scheduler
) starts storage process via start_link/1
:
@doc false
def start_link(opts) do
name = Keyword.fetch!(opts, :name)
GenServer.start_link(__MODULE__, name, name: name)
end
I have it configured under root supervisor like so:
# application.ex
@impl true
def start(_type, _args) do
children = [
# ...
Foo.Scheduler
]
opts = [strategy: :one_for_one, name: Foo.Supervisor]
Supervisor.start_link(children, opts)
end
In the project, I have some business logic around scheduling jobs via Quantum. This is why I need to be able to call my scheduler in tests, to check whether the jobs are created or not. My project uses standard Phoenix setup, so each test relying on the database uses Foo.DataCase
generated by the framework. Tests are configured like so:
# test_helper.ex (fragment)
Ecto.Adapters.SQL.Sandbox.mode(Repo, :manual)
# test/support/data_case.ex (fragment)
setup tags do
Foo.DataCase.setup_sandbox(tags)
:ok
end
def setup_sandbox(tags) do
pid = Ecto.Adapters.SQL.Sandbox.start_owner!(Repo, shared: not tags[:async])
on_exit(fn -> Ecto.Adapters.SQL.Sandbox.stop_owner(pid) end)
end
All tests are synchronous (I don’t use async: true
).
The problem is that tests fail (non-deterministically it seems) with errors like these:
....[error]: MyXQL.Connection (#PID<0.434.0>) disconnected: ** (DBConnection.ConnectionError) owner #PID<0.570.0> exited
Client #PID<0.470.0> is still using a connection from owner at location:
:prim_inet.recv0/3
(myxql 0.6.3) lib/myxql/client.ex:178: MyXQL.Client.recv_packets/5
(myxql 0.6.3) lib/myxql/connection.ex:467: MyXQL.Connection.prepare/2
(myxql 0.6.3) lib/myxql/connection.ex:99: MyXQL.Connection.handle_execute/4
(ecto_sql 3.10.2) lib/ecto/adapters/sql/sandbox.ex:375: Ecto.Adapters.SQL.Sandbox.Connection.proxy/3
(db_connection 2.5.0) lib/db_connection/holder.ex:354: DBConnection.Holder.holder_apply/4
(db_connection 2.5.0) lib/db_connection.ex:1432: DBConnection.run_execute/5
(db_connection 2.5.0) lib/db_connection.ex:1527: DBConnection.run/6
(db_connection 2.5.0) lib/db_connection.ex:713: DBConnection.execute/4
(ecto_sql 3.10.2) lib/ecto/adapters/myxql/connection.ex:36: Ecto.Adapters.MyXQL.Connection.execute/4
(ecto_sql 3.10.2) lib/ecto/adapters/sql.ex:989: Ecto.Adapters.SQL.execute!/5
(ecto_sql 3.10.2) lib/ecto/adapters/sql.ex:945: Ecto.Adapters.SQL.execute/6
(foo 0.1.0) lib/foo//v2/scheduling/storage_impl.ex:60: Foo.StorageImpl.delete_job/1
(foo 0.1.0) lib/foo/v2/scheduling/server.ex:29: Foo.StorageServer.handle_call/3
(stdlib 5.0.2) gen_server.erl:1113: :gen_server.try_handle_call/4
(stdlib 5.0.2) gen_server.erl:1142: :gen_server.handle_msg/6
(stdlib 5.0.2) proc_lib.erl:241: :proc_lib.init_p_do_apply/3
The connection itself was checked out by #PID<0.470.0> at location:
(ecto_sql 3.10.2) lib/ecto/adapters/myxql/connection.ex:36: Ecto.Adapters.MyXQL.Connection.execute/4
(ecto_sql 3.10.2) lib/ecto/adapters/sql.ex:989: Ecto.Adapters.SQL.execute!/5
(ecto_sql 3.10.2) lib/ecto/adapters/sql.ex:945: Ecto.Adapters.SQL.execute/6
(glimpse 0.1.0) lib/foo/v2/scheduling/storage_impl.ex:60: Foo.Scheduling.StorageImpl.delete_job/1
(glimpse 0.1.0) lib/foo/v2/scheduling/server.ex:29: Foo.Scheduling.StorageServer.handle_call/3
(stdlib 5.0.2) gen_server.erl:1113: :gen_server.try_handle_call/4
(stdlib 5.0.2) gen_server.erl:1142: :gen_server.handle_msg/6
(stdlib 5.0.2) proc_lib.erl:241: :proc_lib.init_p_do_apply/3
...
[error]: GenServer Foo.Scheduling.Scheduler.Storage terminating
** (stop) exited in: DBConnection.Holder.checkout(#PID<0.575.0>, [log: #Function<13.83790116/1 in Ecto.Adapters.SQL.with_log/3>, source: "quantum_metadata", cast_params: [], repo: Foo.Repo, timeout: 15000, pool: DBConnection.Ownership, ownership_timeout: :infinity, queue_target: 1000, pool_size: 10])
** (EXIT) shutdown: "owner #PID<0.574.0> exited"
(db_connection 2.5.0) lib/db_connection/holder.ex:97: DBConnection.Holder.checkout/3
(db_connection 2.5.0) lib/db_connection/holder.ex:78: DBConnection.Holder.checkout/3
(db_connection 2.5.0) lib/db_connection.ex:1200: DBConnection.checkout/3
(db_connection 2.5.0) lib/db_connection.ex:1525: DBConnection.run/6
(db_connection 2.5.0) lib/db_connection.ex:713: DBConnection.execute/4
(ecto_sql 3.10.2) lib/ecto/adapters/myxql/connection.ex:36: Ecto.Adapters.MyXQL.Connection.execute/4
(ecto_sql 3.10.2) lib/ecto/adapters/sql.ex:989: Ecto.Adapters.SQL.execute!/5
(ecto_sql 3.10.2) lib/ecto/adapters/sql.ex:945: Ecto.Adapters.SQL.execute/6
(ecto 3.10.3) lib/ecto/repo/queryable.ex:229: Ecto.Repo.Queryable.execute/4
(ecto 3.10.3) lib/ecto/repo/queryable.ex:19: Ecto.Repo.Queryable.all/3
(foo 0.1.0) lib/foo/v2/scheduling/storage_impl.ex:37: Foo.Scheduling.StorageImpl.last_execution_date/0
(foo 0.1.0) lib/foo/v2/scheduling/server.ex:39: Foo.Scheduling.StorageServer.handle_call/3
(stdlib 5.0.2) gen_server.erl:1113: :gen_server.try_handle_call/4
(stdlib 5.0.2) gen_server.erl:1142: :gen_server.handle_msg/6
(stdlib 5.0.2) proc_lib.erl:241: :proc_lib.init_p_do_apply/3
Last message (from Foo.Scheduling.Scheduler.ClockBroadcaster): :last_execution_date
What happens, I think, is that storage process linked to scheduler processes started under my root supervisor cannot checkout connection owned by the test process. To fix this, I’ve modifed my test to look something like:
setup_all do
# Temporarily stop scheduler under root supervisor
:ok = Supervisor.terminate_child(Foo.Supervisor, Foo.Scheduling.Scheduler)
on_exit(fn ->
Supervisor.start_child(Foo.Supervisor, Foo.Scheduling.Scheduler)
end)
end
setup do
:ok = Ecto.Adapters.SQL.Sandbox.checkout(Foo.Repo)
Ecto.Adapters.SQL.Sandbox.mode(Foo.Repo, {:shared, self()})
start_supervised!(Foo.Scheduling.Scheduler)
on_exit(fn ->
Ecto.Adapters.SQL.Sandbox.checkin(Foo.Repo)
end)
end
…which seems very clunky but works… somewhat. The tests now all pass, but from time to time the following error log will pop up:
[error]: MyXQL.Connection (#PID<0.437.0>) disconnected: ** (DBConnection.ConnectionError) client #PID<0.581.0> exited
…where the “exiting” process is always the storage process. It seems to indicate that storage process is still waiting for query result when it’s terminated when the scheduler process is terminated when the test ends. It works (tests pass), but seems very wrong to leave this error like this.
What I’m doing wrong conceptually? How can I fix this? How to approach those kinds of issues in general?
Thanks!