Testing with Oban.Pro.Relay

Hi there! I’m working with Oban.Pro, I’m loving it so far but have been running into a few issues with testing Relay.

I have an API call that generates a bunch of text for me. This API call has a super strict rate limit on it, making it dificult for me to consistently have it work. I’d like to use an Oban Job to handle the exponential backoff and wait for the rate limit to time out. I thought it would be a great idea to use the Relay module to call the job and wait for it’s return. This is what I have so far:

  defp do_generate(_params, _stream, _opts) do
    {:ok, result} =
      %{}
      |> CallJob.new()
      |> Oban.Pro.Relay.async()
      |> Oban.Pro.Relay.await(:timer.seconds(10))

    IO.inspect(result)
  end

Here is the call job it is creating:

defmodule Broadcast.LLM.CallJob do
  use Oban.Pro.Worker

  @impl Oban.Pro.Worker
  def process(%Oban.Job{}) do
    IO.puts("hehe")
    {:ok, 3}
  end
end

In my test, I am calling the function, and expecting it to just run the job

use Oban.Pro.Testing, repo: Broadcast.Repo

test "returns a completion response", ctx do
  assert {:ok, 3} == API.do_generate("", "", [])
end

When I go to run the test, it calls the function, hangs for 10 seconds (as it should) and then says the job failed. Here is the output:

15:04:12.960 [error] Postgrex.Protocol (#PID<0.1243.0>) disconnected: ** (DBConnection.ConnectionError) owner #PID<0.1381.0> exited

Client #PID<0.1397.0> is still using a connection from owner at location:

    :prim_inet.recv0/3
    (postgrex 0.17.2) lib/postgrex/protocol.ex:3188: Postgrex.Protocol.msg_recv/4
    (postgrex 0.17.2) lib/postgrex/protocol.ex:2888: Postgrex.Protocol.recv_transaction/4
    (ecto_sql 3.10.2) lib/ecto/adapters/sql/sandbox.ex:335: Ecto.Adapters.SQL.Sandbox.Connection.handle_begin/2
    (db_connection 2.5.0) lib/db_connection/holder.ex:354: DBConnection.Holder.holder_apply/4
    (db_connection 2.5.0) lib/db_connection.ex:1703: DBConnection.run_begin/3
    (db_connection 2.5.0) lib/db_connection.ex:1224: DBConnection.checkout/4
    (db_connection 2.5.0) lib/db_connection.ex:1695: DBConnection.begin/3
    (db_connection 2.5.0) lib/db_connection.ex:964: DBConnection.transaction/3
    (oban 2.15.4) lib/oban/stager.ex:75: anonymous fn/2 in Oban.Stager.handle_info/2
    (telemetry 1.2.1) /Users/aaggarwal/Documents/broadcast-work/broadcast-api/deps/telemetry/src/telemetry.erl:321: :telemetry.span/3
    (oban 2.15.4) lib/oban/stager.ex:74: Oban.Stager.handle_info/2
    (stdlib 4.2) gen_server.erl:1123: :gen_server.try_dispatch/4
    (stdlib 4.2) gen_server.erl:1200: :gen_server.handle_msg/6
    (stdlib 4.2) proc_lib.erl:240: :proc_lib.init_p_do_apply/3

The connection itself was checked out by #PID<0.1397.0> at location:

    (oban 2.15.4) lib/oban/stager.ex:75: anonymous fn/2 in Oban.Stager.handle_info/2
    (telemetry 1.2.1) /Users/aaggarwal/Documents/broadcast-work/broadcast-api/deps/telemetry/src/telemetry.erl:321: :telemetry.span/3
    (oban 2.15.4) lib/oban/stager.ex:74: Oban.Stager.handle_info/2
    (stdlib 4.2) gen_server.erl:1123: :gen_server.try_dispatch/4
    (stdlib 4.2) gen_server.erl:1200: :gen_server.handle_msg/6
    (stdlib 4.2) proc_lib.erl:240: :proc_lib.init_p_do_apply/3

Then in the test suite I get this error:

     ** (MatchError) no match of right hand side value: {:error, :timeout}

I’m not sure how to go about telling the Relay to please run the job. I’d like to make an integration test to make sure everything works properly as it should, I know there is drain_jobs in the docs, but since it’s async I don’t think I can do that. I’ve also tried doing start_supervised_oban! and passing the name into the opts then passing that name to the async call. But no dice.

Any help is appreciated, or if you can point me to documentation I may have missed about how to test it that would be much appreciated. Thank you!

Also another question I couldn’t really figure out from the docs, If the Relay.async call fails does that rerun the job until the timeout? Or does it just run the job once. I’d like it to re-run the job ideally so might be using the wrong setup possibly.

You’re correct, you can’t drain while you’re actively awaiting a job. The easiest way to test the Relay job is to wrap it in an :inline block, like this:

Oban.Testing.with_testing_mode(:inline, fn ->
  assert {:ok, 3} = API.do_generate("", "", [])
end)

That way you don’t need to start an Oban instance at all and the job will run immediately as it’s inserted.

When await/1 receives an error (implying a retry) it returns the error reason and doesn’t await a retry. However, you can wrap await/1 in a helper to wait for follow-up attempts.

def await_retry(relay, retries, timeout \\ 5_000) when is_integer(retries) do
  case Relay.await(relay) do
    {:error, _} when retries > 0 ->
      await_retry(relay, retries - 1, timeout)

    other ->
      other
  end
end

This question has come up a few times recently and a similar helper may find it’s way into Relay :slightly_smiling_face:

1 Like

Thank you so much @sorentwo! I’ll go about implementing this now. Appreciate the help!

2 Likes

Hi @sorentwo, I’ve been trying to modify the code to work with relay, and the inline worked to run the job, but now the await isn’t recognizing the relay has completed. I think it might be something with the test set up, but everything should match from above. Here is my setup:

def do_generate(params, opts) do
    %{
      params: params,
      options: opts
    }
    |> CallJob.new()
    |> Oban.Pro.Relay.async()
    |> Oban.Pro.Relay.await(:timer.seconds(10))
    |> IO.inspect(label: "CallJob Return")
  end

The job in question

defmodule Broadcast.LLM.CallJob do
  use Oban.Pro.Worker

  @impl Oban.Pro.Worker
  def process(%Oban.Job{args: %{"params" => params} = args}) do
    {:ok, "hello"}
  end
end

The test:

defmodule Broadcast.RelayTest do
    use ExUnit.Case
    use Oban.Pro.Testing, repo: Broadcast.Repo

    describe "generate" do
        test "do_generate" do
              assert {:ok, "hello"} =
               Oban.Testing.with_testing_mode(:inline, fn ->
                    do_generate(%{}, [])
               end)        
         end
    end
end

So I set all this up, and I inspect inside the job and it does work, the job says it return {:ok, “hello”}, but the await does not recognize the job has returned successfully in testing.

 1) test generate/3
     test/broadcast/generate_test.exs:47
     match (=) failed
     code:  assert {:ok, "hello"} ==
              Oban.Testing.with_testing_mode(:inline, fn ->
                API.do_generate(
                 %{}
                )
              end)
     left:  {:ok, "hello"}
     right: {:error, :timeout}

Is there something wrong with my setup? Thank you again!

There shouldn’t be any difference running with :inline testing mode, so my guess is something is off in your notification config. Here’s a relay test from the Pro test suite that exercises await/async with inline testing:

test "returning results from inline execution" do
  with_supervised_oban([testing: :inline], fn ->
    assert {:ok, 123} =
             @name
             |> Relay.async(RelayWorker.new(%{value: 123}))
             |> Relay.await(250)
  end)
end

Are you using the default Postgres notifier? If so, it won’t send notifications in tests due to the sandbox. You need to use the PG notifier to receive pubsub notifications in tests.

3 Likes