Errors in tests with Phoenix PubSub and supervised process

In my Phoenix app I’m using Phoenix PubSub to subscribe interally to events and take actions based on them being broadcast.

In order to manage this I have an Events module that includes a subscriber and handlers to various events:

defmodule MyApp.Events.Subscriber do

  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, name: __MODULE__)
  end

  def init(_) do
     PubSub.subscribe(my_pubsub_module, "my_app_events")

    {:ok, []}
  end

  def handle_info({:some_event, payload}, _) do
    do_important_stuff_with(payload)

    {:noreply, []}
  end
end

To broadcast an event I just do something like:

PubSub.local_broadcast(my_pubsub_module, "my_app_events", {event, payload})

The subscriber is supervised by including it in the application’s children list.

This works well, but there’s a problem in my tests. Where the test calls a function that broadcasts an event that has a subscriber that does stuff (like access the DB, send an email etc) I get an error like so:

[error] Task #PID<0.510.0> started from #PID<0.449.0> terminating
** (stop) exited in: DBConnection.Holder.checkout(#PID<0.486.0>, [log: #Function<15.53332199/1 in Ecto.Adapters.SQL.with_log/3>, source: "users", caller: #PID<0.449.0>, timeout: 15000, pool_size: 10, pool: DBConnection.Ownership])
    ** (EXIT) shutdown: "owner #PID<0.485.0> exited"
    (db_connection 2.3.0) lib/db_connection/holder.ex:86: DBConnection.Holder.checkout/2
    (db_connection 2.3.0) lib/db_connection/holder.ex:67: DBConnection.Holder.checkout/2

The test still passes, but this suggests something is wrong.

How can I prevent such an error?

Assuming do_important_stuff_with starts a Task and you run sandbox in a shared mode, you’d need to await for the started task to finish execution on test exit.

Something similar to Phoenix.Presence — Phoenix v1.6.2, if you use a supervised task, you can ask the supervisor to list its children pids and await on them in on_exit.

If you are running in async: true, you’d also need to explicitely allow the task processes to use the repo conn. Doing so is a bit tricky if the tasks are spawned by some background process, I use :sys.replace_state to swizzle a callback that runs allow in the spawned process sometimes, but that’s a very hacky approach.

Related:

Thanks for your reply.

So basically the do_important_stuff_with() function broadcasts another event whose subscriber hits the database. This seems to be the cause of the issue, as the database connection is likely terminated when this process executes.

The subscriber handlers (handle_info/2) are in all in MyApp.Events.Subscriber which is supervised by a module supervisor like so:

defmodule MyApp.EventBus.Supervisor do
  use Supervisor

  def start_link(_) do
    Supervisor.start_link(__MODULE__, name: __MODULE__)
  end

  def init(_) do
    children = [
      MyApp.Events.Subscriber
    ]

    opts = [strategy: :one_for_one]

    Supervisor.init(children, opts)
  end
end

This supervisor is supervised by the main application.
I tried awaiting process exit with:

on_exit(fn ->
      Task.Supervisor.children(MyApp.Event.Supervisor)
      |> Enum.each(fn pid ->
        Task.Supervisor.terminate_child(MyApp.EventBus.Supervisor, pid)
      end)
    end)

but I get

   ** (exit) exited in: GenServer.call(MyApp.Events.Supervisor, :which_children, :infinity)
         ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started

I’m using the Phoenix-provided ConnCase for my test which, afiak, should run the tests in async: false (I’ve even tried to set this explicitly).

Have you got an actual example where this problem is dealt with?

It doesn’t seem that your MyApp.EventBus.Supervisor is a task supervisor. Also note that your MyApp.EventBus.Supervisor only has one child. I’m not sure if it’s a typo but MyApp.Event.Supervisor != MyApp.EventBus.Supervisor hence the error that the process doesn’t exist.

Example with presence (which is using fetcher_pids instead of which_children, but that’s probably not very important): Tests for the Rumbl application end with error · Issue #3619 · phoenixframework/phoenix · GitHub, if that doesn’t work, try sleeping a bit before asking supervisor for its children: Tests for the Rumbl application end with error · Issue #3619 · phoenixframework/phoenix · GitHub

In general, the approach is:

  • find pids of processes that have been started during the test (most of the time you can ask some supervisor)
  • await for them to exit
  • finish the test

That was a typo in my post. They should be the same module.

I’ve actually fixed my test my moving the DB access to the parent process (it just preloads some associations).

I’d like to learn more about handling processes in Elixir as it’s not something I’ve got my hands dirty with.

It doesn’t seem that your MyApp.EventBus.Supervisor is a task supervisor.

I think that’s right. There shouldn’t be any unsupervised tasks running here. It does currently only have one child, but I anticipate other children being added as I add more subscribers to my Event pubsub topic.

All these years later and I’m running into this exact same problem; simply want a subscriber to hit the DB. Did you end up with a solution you were happy with?