Oban: How to test a parent has waited for children tasks to be executed?

Hello!
I am working on a data collection api.

When the client makes a request for upload of a video, my api creates an Oban job for RequestProcessor worker (super generic name, I know) to upload the whole range. In the perform function, it splits the time range into smaller chunks and inserts each of sub-ranges as parameters to another job in another queue for ChunkProcessor.

I want the parent job in RequestProcessor to wait till all the children jobs in ChunkProcessor queue to be completed, so that the parent can provide info about the status of the whole upload job. I used queue_drain to make sure the ChunkProcessor queue is empty before starting processing next jobs.

Is that a good way to approach it? I struggle to test it, because if I set processing of jobs to manual I imagine calling the perform function of the parent would block the execution of any following job.

This sounds like a case where a Batch would be useful:

https://getoban.pro/docs/pro/0.13.1/Oban.Pro.Workers.Batch.html

Using drain_queue runs every job in the specified queue using the current worker process, so what happens when two RequestProcessor jobs are running at the same time?

1 Like

For my use case it’s ok if the jobs from parent are completely synchronous. I am new to Oban, and I guess your question made me realise I had an assumption that I would never run two jobs from the parent queue at the same time

IMO yes. As for testing, what is the problem exactly? Are the jobs not executing properly in manual mode? I don’t think you need to manually call perform, at least I don’t remember having to do it… :thinking:

1 Like

Ok, so I would like to verify the children jobs were scheduled by RequestProcessor, but I am not sure it’s possible

to give you a bit more detail - here is the main part of my RequestProcessor worker

  def perform(%Oban.Job{args: %{
    "start_ts" => start_ts,
    "end_ts" => end_ts,
    "correlation_id" => correlation_id,
    "location_id" => location_id
  }}) do
    {:ok, datetime1, _} = DateTime.from_iso8601(start_ts)
    {:ok, datetime2, _} = DateTime.from_iso8601(end_ts)

    duration_in_seconds =  DateTime.diff(datetime2, datetime1)
    full_minute_chunk_count = floor(duration_in_seconds / @minute)

    enqueue_full_minute_chunks(correlation_id, location_id, datetime1, 0, full_minute_chunk_count)
    remaining_seconds = duration_in_seconds - (full_minute_chunk_count * @minute)

    Oban.drain_queue(queue: :chunk_processing)

    :ok
  end

And here is my test:

 describe "perform/1" do
    test "splits 1min 30sec range into correct number of chunks" do
      args = %{
        "start_ts" => "2024-03-20T10:09:00Z",
        "end_ts" => "2024-03-20T10:10:30Z",
        "location_id" => "0efa70af-5fe1-43f6-8f0b-fddf0a8dc4b9",
        "correlation_id" => "0efa70af-5fe1-43f6-8f0b-fddf0a8dc4b9"
      }

      assert :ok = perform_job(RequestProcessor, args)
      assert_enqueued(worker: DataCollectionPlatform.Workers.ChunkProcessor, args: [list of enqueued jobs])

    end

and here is my test.config

config :data_collection_platform, Oban,
  testing: :manual,
  queues: [request_processing: 10, chunk_processing: 20]

because the queue is drained there is nothing to tell me that the RequestProcessor did what I wanted, I can only assert it compiled

1 Like

oh, I got it I think

my new test:

  describe "perform/1" do
    test "splits 1min 30sec range into 2 chunks and then waits for them to be processed" do
      args = %{
        "start_ts" => "2024-03-20T10:09:00Z",
        "end_ts" => "2024-03-20T10:10:30Z",
        "location_id" => "0efa70af-5fe1-43f6-8f0b-fddf0a8dc4b9",
        "correlation_id" => "0efa70af-5fe1-43f6-8f0b-fddf0a8dc4b9"
      }

      assert {:ok, result} = perform_job(RequestProcessor, args)
      assert result == %{discard: 0, cancelled: 0, success: 2, failure: 0, snoozed: 0}
    end
  end

and the parent processor ends with

    result = Oban.drain_queue(queue: :chunk_processing)

   { :ok, result}

I was about to suggest checking the success count.

To be even more in a direct connection with what’s going on, you can also introduce code that sends messages to the parent job or the test case process – plain OTP, not Oban facilities – and that only runs in test environment.

It’s best not to override queues in your test config, as testing mode disables them all anyhow.

As a general rule, you shouldn’t use drain_queue from a worker or in production code. It’s really meant for testing, but has remained in the Oban module for backward compatibility. Furthermore, if you have access to Pro, the Testing.drain_jobs/2 function is preferred (better defaults, more options, can drain multiple queues).

As suggested above, this is an ideal place to use a Batch or Relay.await_many/1 to have job block until child-jobs are completed.

If you don’t have access to Pro, then an approach similar to the [Reporting Job Progress(]Reporting Job Progress — Oban v2.18.3) guide would be preferable. Or, simply use Task.async_stream and skip the sub-job pattern entirely.

2 Likes

So, sending messages back. Nice. It’s a common testing pattern in Elixir and it works well.

1 Like

Thank you, these are good suggestions!

As I said, I am new to Oban, so I tried different options

So now my RequestProcessor uses Relay.await_many and I still struggle to test it, because the child job in application code awaits a message a reply from a NATs server. So I am trying and failing to mock the child job completion in my test.

defmodule DataCollectionPlatform.Workers.RequestProcessor do
  use Oban.Pro.Worker, queue: :request_processing

  alias DataCollectionPlatform.Workers.ChunkProcessor
  @minute 60
  @timeout :timer.hours(2)

  @impl true
  def process(%Oban.Job{
        args: %{
          "start_ts" => start_ts,
          "end_ts" => end_ts,
          "correlation_id" => correlation_id,
          "location_id" => location_id
        }
      }) do
     range = get_range(start_ts, end_ts)

    result =
      range
      |> Enum.map(&ChunkProcessor.new(generate_chunk_args(correlation_id, location_id, start_datetime, &1, end_datetime)))
      |> Enum.map(&Oban.Pro.Relay.async(&1))
      |> Oban.Pro.Relay.await_many(@timeout)

    {:ok, result}
  end
test "given less than a minute long request, creates one chunk job" do

    with_mock ChunkProcessor, [
      process: fn _job -> {:ok, "success"} end,
      new: fn args ->
        Ecto.Changeset.change(%Oban.Job{
          args: args,
          worker: "DataCollectionPlatform.Workers.ChunkProcessor",
          state: "available",
          queue: "chunk_processing",
          inserted_at: DateTime.utc_now()
        })
      end
    ] do
      args = %{
        start_ts: "2024-03-20T10:00:00Z",
        end_ts: "2024-03-20T10:00:30Z",
        location_id: "0efa70af-5fe1-43f6-8f0b-fddf0a8dc4b9",
        correlation_id: "0efa70af-5fe1-43f6-8f0b-fddf0a8dc4b9"
      }

      {:ok, result} = perform_job(RequestProcessor, args)

      assert called(ChunkProcessor.process(:_))
      assert result == [{:ok, "success"}]
    end
  end

but it keeps hanging at Oban.Pro.Relay.await_many(@timeout)

if I test it manually - it works! jobs get scheduled, and the parent waits until the completion of children to complete. But I can’t figure out why it hangs in the test

Sorry, I would have pointed you toward Relay initially if I knew you had Pro available!

It hangs in the test because the jobs aren’t running during perform_job/2. In order for the jobs to run you either need to have an async process drain_jobs, or wrap it in inline mode (which I’d definitely suggest). Your test would look like this:

Oban.Testing.with_testing_mode(:inline, fn ->
  assert {:ok, [{:ok, "success"}]} = perform_job(RequestProcessor, args)
end)

We’ll add information about this to the Relay docs :slightly_smiling_face:

2 Likes

thank you for helping!!
I have trouble still with testing this, it seems that only in the test the children job getting to “complete” state doesn’t get picked up by await.

simplified RequestProcessor:

defmodule DataCollectionPlatform.Workers.RequestProcessor do
  use Oban.Pro.Worker,
    queue: :request_processing

  alias DataCollectionPlatform.Workers.ChunkProcessor

  @minute 60
  @timeout :timer.hours(2)

  @impl true
  def process(%Oban.Job{
        args: %{
          "start_ts" => start_ts,
          "end_ts" => end_ts,
          "correlation_id" => correlation_id,
          "location_id" => location_id
        }
      }) do

    result = ChunkProcessor.new(%{
      start_ts: "2024-03-20T10:00:00Z",
      end_ts: "2024-03-20T10:00:30Z",
      location_id: "0efa70af-5fe1-43f6-8f0b-fddf0a8dc4b9",
      correlation_id: "0efa70af-5fe1-43f6-8f0b-fddf0a8dc4b9"
    })
    |> Oban.Pro.Relay.async()
    |> Oban.Pro.Relay.await(@timeout)

    {:ok, result}
  end

simplified ChunkProcessor:

  def process(%Oban.Job{args: args} = job) do
    {:ok, "success"}
  end

test:

defmodule DataCollectionPlatform.Workers.RequestProcessorTest do
  use ExUnit.Case, async: true
  use Oban.Pro.Testing, repo:  DataCollectionPlatform.Repo

  test "processes chunks successfully" do


      args = %{
        start_ts: "2024-03-20T10:00:00Z",
        end_ts: "2024-03-20T10:00:30Z",
        location_id: "0efa70af-5fe1-43f6-8f0b-fddf0a8dc4b9",
        correlation_id: "0efa70af-5fe1-43f6-8f0b-fddf0a8dc4b9"
      }


      Oban.Testing.with_testing_mode(:inline, fn ->
        assert {:ok, [{:ok, "success"}]} = perform_job(RequestProcessor, args)
      end)
  end
end

test config:

config :data_collection_platform, Oban,
  queues: false,
  plugins: false,
  testing: :inline

The problem is not with the assertion, which might be slightly wrong, it just hangs and never gets past await call

It’s counterproductive to override queues and plugins in the test environment. That prevents Oban from verifying the config in the test environment, and they’re disabled by the testing mode anyhow. It’s also recommended that you use :manual to have more control.

config :data_collection_platform, Oban, testing: :manual

There isn’t anything to “hang” on with inline mode because the job executes immediately. Will you try setting an alternate notifier for testing:

config :data_collection_platform, Oban, testing: :manual, notifier: Oban.Notifiers.Isolated
2 Likes

I think that bit of config was the missing piece of the puzzle! thank you, it now all works!!!