Testing a Genserver PubSub subscriber that speaks to the DB

I have a test in my Phoenix app that is testing a Phoenix.PubSub subscriber that uses Genserver. The subscriber does some database work as part of its handle_info/2. It looks like this:

defmodule MyApp.Accounts.Subscriber do
  use GenServer
  require Logger
  alias Phoenix.PubSub

  alias MyApp.ReferralCode

 # genserver stuff...

  def init(_) do
    PubSub.subscribe(MyApp.PubSub, "accounts")
    {:ok, %{}}
  end

  def handle_call(:get, _, state) do
    {:reply, state, state}
  end

  def handle_info({:register, user}, state) do
    Logger.info("user registered: #{user.name}")

    %ReferralCode{user_id: user.id}
    |> ReferralCode.changeset(%{code: ReferralCode.generate_code(user.name)})
    |> MyApp.Repo.insert!()

    {:noreply, state}
  end
end

I start my subscriber in my app’s application.ex

children = [
  # Start the PubSub system
  {Phoenix.PubSub, name: MyApp.PubSub},
  MyApp.Accounts.Subscriber,
  # Start the Ecto repository
  supervisor(MyApp.Repo, []),
  # Start the endpoint when the application starts
  supervisor(MyAppWeb.Endpoint, [])
]

opts = [strategy: :one_for_one, name: MyApp.Supervisor]

Supervisor.start_link(children, opts)

My test looks like this:

test "sending creating a referral code upon user registration" do
  start_supervised(MyApp.Accounts.Subscriber)
  user = insert(:user)

  Phoenix.PubSub.broadcast(MappApp.PubSub, "accounts", {:register, user})

  assert_eventually(Repo.exists?(ReferralCode))

  stop_supervised(MyApp.Accounts.Subscriber)
end

The test runs fine when run by itself but when the whole suite is run I get the following error:

[error] GenServer MyApp.Accounts.Subscriber terminating
** (stop) exited in: DBConnection.Holder.checkout(#PID<0.970.0>, [log: #Function<9.124843621/1 in Ecto.Adapters.SQL.with_log/3>, cache_statement: "ecto_insert_referral_codes", timeout: 15000, pool_size: 10, pool: DBConnection.Ownership])
    ** (EXIT) shutdown: "owner #PID<0.969.0> exited"
    <stacktrace...>

This looks like it’s an issue with the database connection still being open when the process is terminated so it doesn’t die gracefully. But I’m not sure how to deal with this.

As an aside, I tried running the test without the call to start_supervised and stop_supervised and strangely it still passed. This did not solve the error mentioned above though. Is there an explanation for this?

Side question: Is this a valid way to implement a pubsub system to handle side effects based on events?

1 Like

Hello there Sir.
Try using

start_supervised!

(that is with the exclamation mark) in your test in your test at the line

“start_supervised(MyApp.Accounts.Subscriber)”

I hope this helps