Thank you, these are good suggestions!
As I said, I am new to Oban, so I tried different options
So now my RequestProcessor uses Relay.await_many and I still struggle to test it, because the child job in application code awaits a message a reply from a NATs server. So I am trying and failing to mock the child job completion in my test.
defmodule DataCollectionPlatform.Workers.RequestProcessor do
use Oban.Pro.Worker, queue: :request_processing
alias DataCollectionPlatform.Workers.ChunkProcessor
@minute 60
@timeout :timer.hours(2)
@impl true
def process(%Oban.Job{
args: %{
"start_ts" => start_ts,
"end_ts" => end_ts,
"correlation_id" => correlation_id,
"location_id" => location_id
}
}) do
range = get_range(start_ts, end_ts)
result =
range
|> Enum.map(&ChunkProcessor.new(generate_chunk_args(correlation_id, location_id, start_datetime, &1, end_datetime)))
|> Enum.map(&Oban.Pro.Relay.async(&1))
|> Oban.Pro.Relay.await_many(@timeout)
{:ok, result}
end
test "given less than a minute long request, creates one chunk job" do
with_mock ChunkProcessor, [
process: fn _job -> {:ok, "success"} end,
new: fn args ->
Ecto.Changeset.change(%Oban.Job{
args: args,
worker: "DataCollectionPlatform.Workers.ChunkProcessor",
state: "available",
queue: "chunk_processing",
inserted_at: DateTime.utc_now()
})
end
] do
args = %{
start_ts: "2024-03-20T10:00:00Z",
end_ts: "2024-03-20T10:00:30Z",
location_id: "0efa70af-5fe1-43f6-8f0b-fddf0a8dc4b9",
correlation_id: "0efa70af-5fe1-43f6-8f0b-fddf0a8dc4b9"
}
{:ok, result} = perform_job(RequestProcessor, args)
assert called(ChunkProcessor.process(:_))
assert result == [{:ok, "success"}]
end
end
but it keeps hanging at Oban.Pro.Relay.await_many(@timeout)
if I test it manually - it works! jobs get scheduled, and the parent waits until the completion of children to complete. But I can’t figure out why it hangs in the test