I have a test in my Phoenix app that is testing a Phoenix.PubSub
subscriber that uses Genserver. The subscriber does some database work as part of its handle_info/2
. It looks like this:
defmodule MyApp.Accounts.Subscriber do
use GenServer
require Logger
alias Phoenix.PubSub
alias MyApp.ReferralCode
# genserver stuff...
def init(_) do
PubSub.subscribe(MyApp.PubSub, "accounts")
{:ok, %{}}
end
def handle_call(:get, _, state) do
{:reply, state, state}
end
def handle_info({:register, user}, state) do
Logger.info("user registered: #{user.name}")
%ReferralCode{user_id: user.id}
|> ReferralCode.changeset(%{code: ReferralCode.generate_code(user.name)})
|> MyApp.Repo.insert!()
{:noreply, state}
end
end
I start my subscriber in my app’s application.ex
children = [
# Start the PubSub system
{Phoenix.PubSub, name: MyApp.PubSub},
MyApp.Accounts.Subscriber,
# Start the Ecto repository
supervisor(MyApp.Repo, []),
# Start the endpoint when the application starts
supervisor(MyAppWeb.Endpoint, [])
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
My test looks like this:
test "sending creating a referral code upon user registration" do
start_supervised(MyApp.Accounts.Subscriber)
user = insert(:user)
Phoenix.PubSub.broadcast(MappApp.PubSub, "accounts", {:register, user})
assert_eventually(Repo.exists?(ReferralCode))
stop_supervised(MyApp.Accounts.Subscriber)
end
The test runs fine when run by itself but when the whole suite is run I get the following error:
[error] GenServer MyApp.Accounts.Subscriber terminating
** (stop) exited in: DBConnection.Holder.checkout(#PID<0.970.0>, [log: #Function<9.124843621/1 in Ecto.Adapters.SQL.with_log/3>, cache_statement: "ecto_insert_referral_codes", timeout: 15000, pool_size: 10, pool: DBConnection.Ownership])
** (EXIT) shutdown: "owner #PID<0.969.0> exited"
<stacktrace...>
This looks like it’s an issue with the database connection still being open when the process is terminated so it doesn’t die gracefully. But I’m not sure how to deal with this.
As an aside, I tried running the test without the call to start_supervised
and stop_supervised
and strangely it still passed. This did not solve the error mentioned above though. Is there an explanation for this?
Side question: Is this a valid way to implement a pubsub system to handle side effects based on events?