How to send Sandbox.allow for each dynamic supervisor (testing)

Hello friends, I have read this link: Ecto.Adapters.SQL.Sandbox — Ecto SQL v3.7.2, they said if we dont wan to drop a Genserver like this:

** (exit) exited in: GenServer.call(#PID<0.1673.0>, {:push, :edit, %MishkaInstaller.PluginState{depend_type: :soft, depends: [], event: "event_one", extra: [], name: "plugin_hook_one", priority: 1, status: :started}}, 5000)
         ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started

We should do like this:

test "gets results from GenServer" do
  {:ok, pid} = MyAppServer.start_link()
  Ecto.Adapters.SQL.Sandbox.allow(Repo, self(), pid)
  assert MyAppServer.get_my_data_fast(timeout: 1000) == [...]
end

But it did not speck about a Genserver with dynamic supervisor, for example:

Enum.map(@depends, fn item ->
      Map.merge(@new_soft_plugin, %{name: item, depend_type: :hard, depends: List.delete(@depends, item)})
      |> MishkaInstaller.PluginState.push_call()
end)

In the top code I pushed 4 items of a list to Genserver and the Genserver pushed it to my database, and each Genserver has the own dynamic supervisor and PID.

After pushing the items to my database, I try to work with their state and call them, but sometimes my Genserver which is supervised dynamically is dropped.

I increase queue_target and pool_size, they fix some errors not all the problems

    pool_size: 30,
    queue_target: 10000,

I have read many topics, but I still have problems with dynamic supervisor Genserver testing, unfortunately.

Thank you.


More code:

  def push_call(%PluginState{} = element) do
    case PSupervisor.start_job(%{id: element.name, type: element.event}) do
      {:ok, status, pid} ->
        if Mix.env() == :test, do: Logger.warn("Plugin State of #{element.name} is being pushed")
        GenServer.call(pid, {:push, status, element})
      {:error, result} ->  {:error, :push, result}
    end
  end

and start job function

  def start_job(args) do
    DynamicSupervisor.start_child(PluginStateOtpRunner, {MishkaInstaller.PluginState, args})
    |> case do
      {:ok, pid} -> {:ok, :add, pid}
      {:ok, pid, _any} -> {:ok, :add, pid}
      {:error, {:already_started, pid}} -> {:ok, :edit, pid}
      {:error, result} -> {:error, result}
    end
  end
2 Likes

I think you may have misinterpreted the examples.

In the docs, this seems like an example of what you shouldn’t do if you want to share servers across tests, because it’s starting the server in the test and then the server will die when the test is done. So if your other tests are running aynchronously, sometimes they will work because this test is still running.

test "gets results from GenServer" do
  {:ok, pid} = MyAppServer.start_link()
  Ecto.Adapters.SQL.Sandbox.allow(Repo, self(), pid)
  assert MyAppServer.get_my_data_fast(timeout: 1000) == [...]
end

Instead it suggests running in “allow” mode, where you would start the dynamic servers outside of the test, then get a pid of the already-running server by its name, and use that.

test "calls worker that runs a query" do
  allow = Process.whereis(MyApp.Worker)
  Ecto.Adapters.SQL.Sandbox.allow(Repo, self(), allow)
  ... rest of test
end
2 Likes

Thank you, but it is still about a Genserver worker not with a dynamic supervisor, what I should do when I am using this?

To share a connection between a test and a GenServer you need to ensure three things:

  1. somebody calls allow
  2. the GenServer shuts down if the test shuts down
  3. the GenServer shuts down if it’s trying to start up when the test has already shut down (this can happen when the DynamicSupervisor restarts the GenServer)

To handle all three, I’ve used code like this in the application’s Repo:

  def allow_if_sandbox(parent_pid, orphan_msg \\ :stop) do
    if sandbox_pool?() do
      monitor_parent(parent_pid, orphan_msg)

      # this addresses #1
      Ecto.Adapters.SQL.Sandbox.allow(__MODULE__, parent_pid, self())
    end
  end

  def sandbox_pool?() do
    config = Application.fetch_env!(:core, __MODULE__)
    Keyword.get(config, :pool) == Ecto.Adapters.SQL.Sandbox
  end

  defp monitor_parent(parent_pid, orphan_msg) do
    # this is part of addressing #2
    Process.monitor(parent_pid)

    if Process.alive?(parent_pid) do
      :ok
    else
      Logger.error("#{inspect(parent_pid)} down when booting #{inspect(self())}")
      # this addresses #3
      # the "throw" will work like an early "return"; see the GenServer docs
      throw(orphan_msg)
    end
  end

The Process.alive? check is not strictly required - monitoring an already-down process will put a :DOWN message into the queue - but typically the code that calls Repo.allow_if_sandbox will then immediately make a Repo.one call instead of processing messages.

Then in your GenServer, there are two changes needed:

  • receive parent_pid in the arguments to init/1 and call Repo.allow_if_sandbox(parent_pid)
  • add a handler for the message Process.monitor can trigger, like:
def handle_info({:DOWN, _ref, :process, pid, _reason}, state)
  # log that this happened, etc. Don't use Repo!
  :stop
end

Finally, you’ll need to actually pass the parent_pid - usually by adding it to the code that’s starting processes. Based on your example, something like:

Enum.map(@depends, fn item ->
  Map.merge(@new_soft_plugin, %{
    parent_pid: self(),
    name: item,
    depend_type: :hard,
    depends: List.delete(@depends, item)
  })
  |> MishkaInstaller.PluginState.push_call()
end)
2 Likes

Hi, after a long delay I have tested your code, and it fixed many errors of my open-source project And I appreciate you very much :rose:.

I have 2 problems

1- it forces me to increase pool_size to 20. Lower than 20 it has error and says to increase
2- sometimes it forgets its registry state.

For second problems, I forced to put :timer.sleap in my test (link)