Cannot find ownership process for #PID<0.564.0>

It’s a little annoying testing async code in eixir…

Is it possible to run async tests with database by manually controlling my connections?

I have a piece of code that does:

caller = self()
Task.Supervisor.start_child(X.TaskSupervisor, fn ->
Ecto.Adapters.SQL.Sandbox.allow(X.Repo, caller, self())

it returns: cannot find ownership process for pid…

I put

setup do
# Explicitly get a connection before each test
:ok = Ecto.Adapters.SQL.Sandbox.checkout(X.Repo)
end

it returns: {:already, :owner}
and still: cannot find ownership process for pid…

I want to run async: true.

Should I always run async: false with databases?

In your code you’re spawning a task that immediately dies after attempting to obtain ownership. Tasks also get bound into the caller chain anyways, so that’s why it says “already owner” when you run the allowance after you activate the repo.

You are probably attempting to access the database from some thread “out there” somewhere. You need to set the allowance on the thread that’s calling the database (not a random thread that is just going to die anyways). Find the pid at the site of the database repo call (maybe inspect it). If you can’t figure out how to communicate that pid back to the test, it may be that you have architecture your system in a way that makes it difficult to test. Imo, those systems will also be hard to maintain in the long run. In which case you have two options: roll with it, or rearchitect your system.

2 Likes

Are you sure you are using manual mode for Sandbox?

2 Likes

This works for me:

  test "async databases" do
    {:ok, sup} = Task.Supervisor.start_link()
    test_pid = self()
    data = DB.Table.insert_new()
    DB.Table.all() |> IO.inspect  # has the new item
    Task.Supervisor.start_child(sup, fn ->
      DB.Table.all() |> IO.inspect  # sees the same stuff
      # we're finished.  If you don't do this parent test pid closes and takes the db checkout with it
      send(test_pid, :done)
    end)

    #keep the db checkout alive till it's done
    receive do :done -> :ok end
  end
3 Likes

Thank you very much.

Actually I am facing the same issue…

Even doing this:

setup tags do
:ok = Ecto.Adapters.SQL.Sandbox.checkout(X.Repo)

Ecto.Adapters.SQL.Sandbox.allow(X.Repo, self(), Server)

Right after checkout… this Server is not allowed to use databasers…
This server is being created by my supervisor, then it receives a message on handle_info that uses the Repo for a databse operation… but it fails because ownerships.

I also tried to put this allow inside the test itself… same problem.

It looks like this Ecto.Adapters.SQL.Sandbox.allow(X.Repo, self(), Server) is doing nothing…

It’s using its name so it “allow” should work right?

def start_link(_) do
GenServer.start_link(MODULE, [], name: MODULE)
end

Tasks work automatically as you said, but GenServers don’t work for nothing.

Yes… test_helper.exs

ExUnit.start()
Ecto.Adapters.SQL.Sandbox.mode(X.Repo, :manual)

Can you provide a complete example that others can run?

Correct. GenServers don’t bind $callers. This is deliberate. I’m not saying it’s always a bad idea (because I do something similar for a real reason in prod [see note]), but if you have a GenServer calling a database, there’s a good chance you’re violating the basic good-practices principle of functional-core OTP design:

https://hexdocs.pm/elixir/GenServer.html#module-when-not-to-use-a-genserver

Use processes only to model runtime properties, such as mutable state, concurrency and failures, never for code organization.

If you’re using a GenServer to call a database there is a really good chance you are trying to use the GenServer to organize your code. If you truly have transient GenServers, after you spin them up in your task, 1) your test should be able to find it and 2) you should be then able to run Process.put(:“$callers”, [test_pid]) inside the GenServer, or (better Sandbox.allow), then your genserver will see the database.

If really really need some sort of Global Genserver with a name, then it’s Global. You can’t run async tests on it, because you have shared async state, and that’s that. I think that honestly most use cases don’t need this, so you may be able to rearchitect your system.

[note] I did the Process.put(:“$callers”) thing, and took the hit on convenience because it was important. I think elixir doing this is a great thing, because the roadblock made me stop and think, “do I really need this?”.

3 Likes

I can try to give a context.

I have an app that sends a message to a RabbitMQ Server… after finishs this RabbitMQ sends a message back and I have a function that receives that result. I tried everything to run database operation inside this function but nothing works. I am talking about Sandbox and test environment… I always have cannot find ownership process for #PID<0.548.0>.\n\nWhen using ownership, you must manage connections…

I tried:

1 - Generates a Genserver and runs allow with the owner process and the server process.
2 - Generates a “Task”, I mean “Use Task” instead of “Use Genserver” and send what what I want via start_link and allowing right after.
3 - I tried to allow directly inside the test itself by giving a name to my GenServer and sending the message coming from Rabbit to it by using send(MyServerName, message)… My Server gets the message but it’s still not able to use databases because ownerships…

Is there a way to get a database connection and send it via arguments? This way I can use it without having this kind of problem.

ok, so you are leavig the erlang VM. Yeah I got something for you, hang on.

Could you perhaps at least show your complete code? The modules that call the database, as well as the full test module? The summarized versions are not enough I think.

Yes I need to receive a message from Rabbit… so I need a “normal process” and I want to do something with this message.

This is a trick that PagerDuty uses with Kafka (or so I hear. I don’t work there). I don’t know anything about rabbitmq

in your rabbitmq message (let’s pretend it’s JSON), stuff an extra parameter. For example if you were original packaging

{foo: "bar", baz: 1}

make it

{foo: "bar", baz: 1, test_pid: <encoded_test_pid>}

where encoded_test_pid is test_pid |> :erlang.term_to_binary |> Base.encode16

Be sure to send it back with your response.

When you handle your response, trap the test_pid string and convert it back to a pid value

test_pid = encoded_test_pid |> Base.decode16! |> :erlang.binary_to_term

Then use Sandbox.allow(Repo, test_pid, self()) immediately afterwards.

It’s very important that you are careful about this because :erlang.binary_to_term is a security regression, so please try to make the code never deploy to prod using Mix.env fences, or you’re 100% sure you’re on a secure private network.

def start(_type, _args) do
    # List all child processes to be supervised
    children = [
      Nuvemx.Repo,
      Nuvemx.RabbitMQ,
      NuvemxWeb.Endpoint,
      Nuvemx.RecommenderSystems.SimilarityServer,
      {Task.Supervisor, name: Nuvemx.TaskSupervisor},
      Guardian.DB.Token.SweeperServer
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: Nuvemx.Supervisor]
    Supervisor.start_link(children, opts)
  end```

defmodule Nuvemx.RecommenderSystems.SimilarityServer do
use GenServer

alias Nuvemx.RecommenderSystems
alias Nuvemx.RabbitMQ

@exchange_name “similarities”

def start_link(_) do
GenServer.start_link(MODULE, [], name: MODULE)
end

@impl true
def init(state) do
{:ok, state}
end

@impl true
def handle_info({:similarities, similarities}, state) do

IO.inspect "callers.."
Process.get(:"$callers") |> IO.inspect

RecommenderSystems.update_similarities(similarities)

{:noreply, state}

end


defp build_similarity(%Item{}, user_id, caller) do
SimilarityServer.execute(“items”, get_items(user_id), consume(caller))
end

defp build_similarity(%Client{}, user_id, caller) do
SimilarityServer.execute(“clients”, get_clients(user_id), consume(caller))
end

defp consume(caller) do
fn payload, _meta ->
similarities = Jason.decode!(payload)

  send(caller, {:similarities, similarities})
  send(caller, :done)
  
  Ecto.Adapters.SQL.Sandbox.allow(Nuvemx.Repo, caller, SimilarityServer)
  send(SimilarityServer, {:similarities, similarities})

end
end


My test is calling the last block... caller = Test's pid... so I am able to allow directly.. but it does not work.. 

Async: true. 

I want to run my tests with async: true.. isn't possible for this scenario? 

This line bellow is using database and does not work just because is in another process.

RecommenderSystems.update_similarities(similarities)

I have the caller (test PID)… it does not work:

defp consume(caller) do
   fn payload, _meta ->
      similarities = Jason.decode!(payload)
 
      Ecto.Adapters.SQL.Sandbox.allow(Nuvemx.Repo, caller, SimilarityServer)
      send(SimilarityServer, {:similarities, similarities})

 send(caller, {:similarities, similarities})
      send(caller, :done)
   end
  end

This does not work either.

defp consume(caller) do
   fn payload, _meta ->

      Process.put(:"$callers", caller)

      similarities = Jason.decode!(payload)
 
      send(SimilarityServer, {:similarities, similarities})
      send(caller, {:similarities, similarities})
      send(caller, :done)
   end

I am trying something different: caller is there, but is still failing.

defp consume(caller) do
   fn payload, _meta ->
      similarities = Jason.decode!(payload)
 
      send(SimilarityServer, {:similarities, caller, similarities})
      send(caller, {:similarities, similarities})
      send(caller, :done)
   end
  end
@impl true
  def handle_info({:similarities, caller, similarities}, state) do
    Process.put(:"$callers", caller)

    IO.inspect "callers.."
    Process.get(:"$callers") |> IO.inspect

    RecommenderSystems.update_similarities(similarities)
    
    {:noreply, state}
  end 
@impl true
  def handle_info({:similarities, caller, similarities}, state) do
    Process.put(:"$callers", [caller])

    IO.inspect "callers.."
    Process.get(:"$callers") |> IO.inspect

    RecommenderSystems.update_similarities(similarities)
    
    {:noreply, state}
  end