Hi there! I’m working with Oban.Pro, I’m loving it so far but have been running into a few issues with testing Relay.
I have an API call that generates a bunch of text for me. This API call has a super strict rate limit on it, making it dificult for me to consistently have it work. I’d like to use an Oban Job to handle the exponential backoff and wait for the rate limit to time out. I thought it would be a great idea to use the Relay module to call the job and wait for it’s return. This is what I have so far:
defp do_generate(_params, _stream, _opts) do
{:ok, result} =
%{}
|> CallJob.new()
|> Oban.Pro.Relay.async()
|> Oban.Pro.Relay.await(:timer.seconds(10))
IO.inspect(result)
end
Here is the call job it is creating:
defmodule Broadcast.LLM.CallJob do
use Oban.Pro.Worker
@impl Oban.Pro.Worker
def process(%Oban.Job{}) do
IO.puts("hehe")
{:ok, 3}
end
end
In my test, I am calling the function, and expecting it to just run the job
use Oban.Pro.Testing, repo: Broadcast.Repo
test "returns a completion response", ctx do
assert {:ok, 3} == API.do_generate("", "", [])
end
When I go to run the test, it calls the function, hangs for 10 seconds (as it should) and then says the job failed. Here is the output:
15:04:12.960 [error] Postgrex.Protocol (#PID<0.1243.0>) disconnected: ** (DBConnection.ConnectionError) owner #PID<0.1381.0> exited
Client #PID<0.1397.0> is still using a connection from owner at location:
:prim_inet.recv0/3
(postgrex 0.17.2) lib/postgrex/protocol.ex:3188: Postgrex.Protocol.msg_recv/4
(postgrex 0.17.2) lib/postgrex/protocol.ex:2888: Postgrex.Protocol.recv_transaction/4
(ecto_sql 3.10.2) lib/ecto/adapters/sql/sandbox.ex:335: Ecto.Adapters.SQL.Sandbox.Connection.handle_begin/2
(db_connection 2.5.0) lib/db_connection/holder.ex:354: DBConnection.Holder.holder_apply/4
(db_connection 2.5.0) lib/db_connection.ex:1703: DBConnection.run_begin/3
(db_connection 2.5.0) lib/db_connection.ex:1224: DBConnection.checkout/4
(db_connection 2.5.0) lib/db_connection.ex:1695: DBConnection.begin/3
(db_connection 2.5.0) lib/db_connection.ex:964: DBConnection.transaction/3
(oban 2.15.4) lib/oban/stager.ex:75: anonymous fn/2 in Oban.Stager.handle_info/2
(telemetry 1.2.1) /Users/aaggarwal/Documents/broadcast-work/broadcast-api/deps/telemetry/src/telemetry.erl:321: :telemetry.span/3
(oban 2.15.4) lib/oban/stager.ex:74: Oban.Stager.handle_info/2
(stdlib 4.2) gen_server.erl:1123: :gen_server.try_dispatch/4
(stdlib 4.2) gen_server.erl:1200: :gen_server.handle_msg/6
(stdlib 4.2) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
The connection itself was checked out by #PID<0.1397.0> at location:
(oban 2.15.4) lib/oban/stager.ex:75: anonymous fn/2 in Oban.Stager.handle_info/2
(telemetry 1.2.1) /Users/aaggarwal/Documents/broadcast-work/broadcast-api/deps/telemetry/src/telemetry.erl:321: :telemetry.span/3
(oban 2.15.4) lib/oban/stager.ex:74: Oban.Stager.handle_info/2
(stdlib 4.2) gen_server.erl:1123: :gen_server.try_dispatch/4
(stdlib 4.2) gen_server.erl:1200: :gen_server.handle_msg/6
(stdlib 4.2) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Then in the test suite I get this error:
** (MatchError) no match of right hand side value: {:error, :timeout}
I’m not sure how to go about telling the Relay to please run the job. I’d like to make an integration test to make sure everything works properly as it should, I know there is drain_jobs
in the docs, but since it’s async I don’t think I can do that. I’ve also tried doing start_supervised_oban!
and passing the name into the opts then passing that name to the async call. But no dice.
Any help is appreciated, or if you can point me to documentation I may have missed about how to test it that would be much appreciated. Thank you!