How do I perform a job after all chunks of a chunk worker have completed?

Is there a way to know when all Chunk Worker chunks have been processed? I have an indeterminate number of chunk workers but I’d like to take an action when all of them are completed. I think this could be accomplished by checking when the leader is finished processing, but I’m not sure that is possible.

Would using a Workflow worker with using add/4 or append/2 for adding the dynamic number of chunk workers be the way to handle this?

Essentially, I want the Batch Worker callback functionality but with a Chunk worker.

The short answer is no, there isn’t any way to know that. Unlike a batch, there’s nothing that groups a collection of jobs into a chunk before they’re processed. Chunks are designed to operate on an infinite stream of jobs.

The leader is the first job fetched from the queue which is then used to fetch the rest of the chunk. There’s no guarantee about the order or which job is the leader, so you can’t rely on that as an indication of anything.

Workflows are designed for individual jobs, not collections like you have with a chunk. Maybe it would work to fan out to a chunk, but I’m not sure how it would behave and that’s not the approach I’d take.

I think you can get very close to that with this approach:

  1. Add a grouping value to the chunk args, something like chunk_batch_id
  2. Add an after_process/3 hook to the chunk that checks if all jobs in the batch-of-chunks are completed

Here’s what that could look like:

defmodule MyApp.BatchChunk do
  use Oban.Pro.Workers.Chunk, queue: :messages, size: 100, timeout: 1000

  import Ecto.Query

  alias MyApp.Repo

  @impl true
  def after_process(:complete, %Job{args: %{"chunk_batch_id" => cb_id}}, _result) do
    Task.start(fn ->
      Process.sleep(100)

      incomplete =
        Job
        |> where([j], fragment("? @> ?", j.args, ^%{"chunk_batch_id" => cb_id}))
        |> where([j], j.state != "completed")
        |> select(count())
        |> Repo.one()

      if incomplete > 0 do
        IO.puts "NOT FINISHED YET"
      else
        IO.puts "CHUNK FINISHED"
      end
    end)
  end

  def after_process(_state, _job, _result), do: :ok

  @impl true
  def process([_ | _] = jobs) do
    ...
  end
end

Only the leader of each chunk will trigger the after_process callback, so for large batches this will execute once for each chunk rather than each job. The Task and sleep are used to ensure the leader job is acked to the database, otherwise the count would always be over 0 because after_process/3 is called before the job is marked completed.

Hope that helps!

2 Likes

Thank you, this solves what I want to do exactly. I’m already chunking by an identifier so I can use that rather than introduce a new chunk_batch_id so it’s pretty easy to adopt.