Oban how to properly assert on enqueued jobs?

I am trying to insert another Oban job after one finishes by listening to [:oban, :job, :stop].
I’d like to assert that the next job was queued after one succeeded, so in my test I do something like this :point_down:

test "schedules job after another" do
      # ...
      {:ok, result} =
        perform_job(Workers.MyWorker, %{
          record_id: some_record_id
        })

      assert_enqueued(
        [worker: Workers.SecondWorker, args: %{result_id: result.id}],
        1000
      ) 
end

And my telemetry event handler looks like this

  def handle_event(
        [:oban, :job, :stop],
        _measure,
        meta = %{worker: worker, result: {:ok, result}},
        _
      ) do
      Workers.SecondWorker.new(%{result_id: result.id}) |> Oban.insert()
  end

However even though I can see the job inserted after inspecting the result of Oban.insert (though it doesn’t have an ID) my test still fails and I couldn’t really figure out why.

If I change the Oban config to be

config :lead_engine, Oban, testing: :manual

then it works but I don’t know whether there’s any other way of making this work because now some other tests fail. :sweat_smile:

If I wrap only my test case with with_testing_mode(:manual... then I get

     ** (ArgumentError) nil given for `id`. comparison with nil is forbidden as it is unsafe. Instead write a query with is_nil/1, for example: is_nil(s.id)

IIRC :telemetry handlers run in a separate process, you may be encountering DB sandbox issues. I did NOT recall correctly, see @LostKobrakai’s note below.

Also beware: telemetry doesn’t provide any delivery guarantees and aggressively (silently + permanently for the VM’s lifetime) removes listeners that crash even once. If SecondWorker is doing anything important, you may want to consider an alternative approach like Oban Pro’s workflows.

They do not, as they don’t want to introduce message passing.

Also beware: telemetry doesn’t provide any delivery guarantees and aggressively (silently + permanently for the VM’s lifetime) removes listeners that crash even once. If SecondWorker is doing anything important, you may want to consider an alternative approach like Oban Pro’s workflows.

I was guessing that those are GenServers and somehow they come back up, but yes, I’ve already encountered that during my tests, so we’ll probably have to switch to Pro to be able to use Workflows. Thank you!

I wonder how other people use it though, I mean if there’s no guarantee and the listeners might get removed. Is it because it doesn’t hurt much (well maybe it does, I have no idea :stuck_out_tongue:) losing some metrics?

Since a crashing handler crashes the calling process, I assume the design principle is “prefer uptime over metrics” which isn’t unreasonable.

It can be kinda painful if you mess up, for instance, a listener that tracks Oban.Job crashes and sends them to Sentry so that you don’t get any reports from production after the very first one. Ask me how I learned THAT one :stuck_out_tongue:

1 Like

The point here is much more about this:

The code emitting telemetry events can’t handle problems of listeners – those can be libraries, which have no relationship to your project or metric setup. So it’s the job of listeners to have their shit together and deal with any issues they encounter.

2 Likes

Correct, because in :inline mode, which is what I assume your original config was using, no jobs are ever enqueued in the DB

But then as I mentioned, even if I wrap it with with_testing_mode(:manual... I get that argument error I shared above.

Most likely because, as the error message suggests, perform_job is returning a result struct with id = nil.

Take a look at what happens in your worker or share the code so someone can help you.

It wasn’t actually about the result perform_job returns but that happens when Oban tries to complete the job, at least that’s what I understood from stacktrace.

  1) test handle_event/4 schedules the next job in the pipeline after one finishes (MyApp.ObanTelemetryHandlerTest)
     apps/my_app/test/my_app/oban_telemetry_handler_test.exs:16
     ** (ArgumentError) nil given for `id`. comparison with nil is forbidden as it is unsafe. Instead write a query with is_nil/1, for example: is_nil(s.id)
     code: Oban.Testing.with_testing_mode(:manual, fn ->
     stacktrace:
       (ecto 3.8.3) lib/ecto/query/builder/filter.ex:191: Ecto.Query.Builder.Filter.not_nil!/2
       (oban 2.12.1) lib/oban/queue/basic_engine.ex:137: Oban.Queue.BasicEngine.complete_job/2
       (oban 2.12.1) lib/oban/queue/engine.ex:234: anonymous fn/3 in Oban.Queue.Engine.with_span/4
       (telemetry 0.4.3) /Users/system/apps/my-app/deps/telemetry/src/telemetry.erl:272: :telemetry.span/3
       (oban 2.12.1) lib/oban/queue/executor.ex:203: Oban.Queue.Executor.ack_event/1
       (oban 2.12.1) lib/oban/queue/executor.ex:197: Oban.Queue.Executor.report_finished/1
       (oban 2.12.1) lib/oban/queue/executor.ex:80: anonymous fn/1 in Oban.Queue.Executor.call/1
       (oban 2.12.1) lib/oban/testing.ex:236: Oban.Testing.perform_job/3
       test/my_app/oban_telemetry_handler_test.exs:26: anonymous fn/1 in MyApp.ObanTelemetryHandlerTest."test handle_event/4 schedules the next job in the pipeline after one finishes"/1
       (oban 2.12.1) lib/oban/testing.ex:398: Oban.Testing.with_testing_mode/2
       test/my_app/oban_telemetry_handler_test.exs:24: (test)

Also when I’ve inspected the actual Oban.Job that was passed to perform/1 from perform_job, I see that the id is nil.

Not sure that’s because of perform_job but I do not think so (or maybe :thinking:). I mean perform_job doesn’t actually insert anything as far as I understood by looking at the module.

I’m also using perform_job in other tests but I’ve never encountered the same error.


Edit

Somehow using perform_job within with_testing_mode(:manual... causes some problems

      # ...
     with_testing_mode(:manual, fn ->
      {:ok, result} =
        perform_job(Workers.MyWorker, %{
          record_id: some_record_id
        })

      assert_enqueued(
        [worker: Workers.SecondWorker, args: %{result_id: result.id}],
        1000
      ) 
     end)

The above snippet somehow throws the ArgumentError I shared above.

I’ve also tried to use Oban.insert/1 instead of using perform_job/1 hoping that it would insert the job and once my job is handled it would enqueue another one :point_down:

      # ...
     with_testing_mode(:manual, fn ->
      %{record_id: some_record_id}
      |> Workers.MyWorker.new()
      |> Oban.insert()

      assert_enqueued(
        [worker: Workers.SecondWorker, args: %{result_id: result.id}],
        1000
      ) 
     end)

but somehow this way my first job didn’t even get processed. At least I didn’t see the IO.inspects that I did to ensure it was working the way I assumed it would.

how I could finally make it work was by directly calling perform/1 on my worker :point_down:

      # ...
     with_testing_mode(:manual, fn ->
      Workers.MyWorker.perform(%Oban.Job{args: %{record_id: some_record_id}})

      assert_enqueued(
        [worker: Workers.SecondWorker, args: %{result_id: result.id}],
        1000
      ) 
     end)

But at that point, is this test useful or necessary? Because I am calling perform/1 directly and I already know that if the job is succesfull the next thing it will do is to schedule another one. Maybe that’s the topic of another question about testing in general.

That’s expected, perform_job/3 doesn’t insert the job in the DB (it’s used to unit test a worker) so the id of the job will be nil

Yeah I verified this error with my own code and I was able to reproduce it.

Here’s what I strongly believe it’s going on:

  1. :inline testing mode uses the Oban.Queue.InlineEngine, which doesn’t touch the DB, while :manual testing mode uses the Oban.Queue.BasicEngine, which inserts jobs in the DB
  2. perform_job/3 is always supposed to run in :inline mode using the InlineEngine, if it accidentally uses the BasicEngine, the error you encountered will be triggered, namely it will try to update the state of a job that was never inserted in the DB
  3. Therefore, when you call perform_job/3, it sets testing: :inline in the config here: oban/lib/oban/testing.ex at fdeb0001bbacb60c4686b86ca0610c7f7508d357 · sorentwo/oban · GitHub which is then used here: oban/lib/oban/config.ex at fdeb0001bbacb60c4686b86ca0610c7f7508d357 · sorentwo/oban · GitHub to enforce the usage of the InlineEngine (the correct one), regardless of the testing mode set in your main config. So you can have testing: :manual in your config and still have perform_job/3 run correctly using the InlineEngine.
  4. This works well as long as you don’t use with_testing_mode(:manual, ...). Reason is that with_testing_mode/2 sets the engine to use in the process dictionary here: oban/lib/oban/testing.ex at 9b4861354f0189d548f4d5cd89273bc98f8eaede · sorentwo/oban · GitHub and this takes precedence over the engine set by perform_job/3 . The overwrite happens here: oban/lib/oban/queue/engine.ex at 1e0f61a913a2ba52c985675383e2f7a180119ac5 · sorentwo/oban · GitHub

Long story short: perform_job/3 always tries to use the InlineEngine, cause it fails otherwise, but with_testing_mode/2 gets in the way end enforces the usage of the BasicEngine, which is the wrong one and causes the issue.

This seems to be a bug, perform_job/3 should ignore the engine set by with_testing_mode/2. Although, to be honest, I see little use for explicitly setting the test mode to :manual and then calling perform_job/3. If you’re using manual mode you’re expected to insert jobs in the DB and execute them with drain_queue/2.

1 Like

Thank you for taking the time to try it! :bowing_man:
I think you’re right, as @shanesveller has mentioned in Slack channel that he’s got a PR for that. I’ll try from that branch and see if it solves my problem.

wasted time apparently, since the bug is already known and there’s a PR for it… Well, it happens.

Sorry, I didn’t know about it to let you know beforehand. I just saw the message this morning. :sweat_smile:

Hey no worries! Glad you found the solution to your problem. Also, it was an instructive deep dive into oban’s testing code, so not a complete waste of time :smile_cat:

1 Like

This is precisely the reason why I submitted this PR a while ago:

This hasn’t been merged yet, but it’s the use case I had in mind and also experienced, that a lot of libraries do not provide any additional instrumentation in addition to telemetry events. It’s not great fit, since Telemetry was never meant to do instrumentation, hence the callbacks detaching mechanism. But life is life, and libraries do use it this way.

So my idea is to add option to specify durable handlers. If you have better idea I’d be very much interested to hear.

1 Like

I think you called out exactly what I’m thinking in a previous comment on the issue linked to that PR:

It’s almost like we need a second event dispatch mechanism, along Telemetry, where events are durable (i.e. do not detach), execute in predictable and defined way, and failures in handlers result in overall request failure as well.

IMO that second mechanism is not :telemetry, it’s a different thing entirely with different requirements and tradeoffs.

The big difference is what happens when things go wrong: :telemetry takes the position that “uptime with missing logs” is better than “downtime with 100% complete logs” under heavy load or crashing handler functions. A hypothetical “callback registry” or “event bus” would likely make the opposite choice.