Is there a way to know when all Chunk Worker chunks have been processed? I have an indeterminate number of chunk workers but I’d like to take an action when all of them are completed. I think this could be accomplished by checking when the leader is finished processing, but I’m not sure that is possible.
Would using a Workflow worker with using add/4
or append/2
for adding the dynamic number of chunk workers be the way to handle this?
Essentially, I want the Batch Worker callback functionality but with a Chunk worker.
The short answer is no, there isn’t any way to know that. Unlike a batch, there’s nothing that groups a collection of jobs into a chunk before they’re processed. Chunks are designed to operate on an infinite stream of jobs.
The leader
is the first job fetched from the queue which is then used to fetch the rest of the chunk. There’s no guarantee about the order or which job is the leader, so you can’t rely on that as an indication of anything.
Workflows are designed for individual jobs, not collections like you have with a chunk. Maybe it would work to fan out to a chunk, but I’m not sure how it would behave and that’s not the approach I’d take.
I think you can get very close to that with this approach:
- Add a grouping value to the chunk args, something like
chunk_batch_id
- Add an
after_process/3
hook to the chunk that checks if all jobs in the batch-of-chunks are completed
Here’s what that could look like:
defmodule MyApp.BatchChunk do
use Oban.Pro.Workers.Chunk, queue: :messages, size: 100, timeout: 1000
import Ecto.Query
alias MyApp.Repo
@impl true
def after_process(:complete, %Job{args: %{"chunk_batch_id" => cb_id}}, _result) do
Task.start(fn ->
Process.sleep(100)
incomplete =
Job
|> where([j], fragment("? @> ?", j.args, ^%{"chunk_batch_id" => cb_id}))
|> where([j], j.state != "completed")
|> select(count())
|> Repo.one()
if incomplete > 0 do
IO.puts "NOT FINISHED YET"
else
IO.puts "CHUNK FINISHED"
end
end)
end
def after_process(_state, _job, _result), do: :ok
@impl true
def process([_ | _] = jobs) do
...
end
end
Only the leader of each chunk will trigger the after_process
callback, so for large batches this will execute once for each chunk rather than each job. The Task
and sleep
are used to ensure the leader
job is acked to the database, otherwise the count would always be over 0 because after_process/3
is called before the job is marked completed
.
Hope that helps!
2 Likes
Thank you, this solves what I want to do exactly. I’m already chunking by an identifier so I can use that rather than introduce a new chunk_batch_id
so it’s pretty easy to adopt.