Oban Pro: Run final worker after all paginated workflows complete

I’m paginating through user posts and processing each one using Oban Workflow.

Is there a way to run CompleteWorker after all workflows from all pages have been completed.

defmodule SearchWorker do
  use Oban.Pro.Worker

  args_schema do
    field :term, :string
    field :cursor, :string
  end

  @impl true
  def process(%Job{args: args}) do
    {:ok, posts, new_cursor} = search_posts!(args.term, args.cursor)

    posts
    |> Enum.reduce(Workflow.new(), fn post, acc ->
      acc
      |> Workflow.add("#{post["id"]}_save_post", SavePostWorker.new(%{post_id: post["id"]}))
      |> Workflow.add("#{post["id"]}_process_post", ProcessPostWorker.new(%{post_id: post["id"]}), deps: ["#{post["id"]}_save_post"])
    end)
    |> Oban.insert_all()

    if new_cursor do
      %{args | cursor: new_cursor}
      |> new()
      |> Oban.insert!()
    else
      %{term: args.term}
      |> CompleteWorker.new()
      |> Oban.insert!()
    end

    :ok
  end

  defp search_posts!(term, cursor) do
    response = Req.get!("https://example.com/search/posts/#{term}", params: %{cursor: cursor})
    %Req.Response{status: 200} = response

    {:ok, response.body["posts"], response.body["cursor"]}
  end
end

One way is to add a job at the end whose dependencies are all the jobs in the last step. Depending on your workflow, that could be a lot of dependencies :upside_down_face:.

1 Like

Yes…
Also, if you’re using 1.5, you could also use a batch callback for the workflow:

1 Like

Using Batch from Oban 1.5.0 should do the job. I can generate a custom batch ID and pass it to the next page. Adding a next page worker to the batch can prevent it from finishing too early if search_posts! is slower than processing.

posts
|> Enum.reduce(Workflow.new(), fn post, acc ->
  acc
  |> Workflow.add("#{post["id"]}_save_post", SavePostWorker.new(%{post_id: post["id"]}))
  |> Workflow.add("#{post["id"]}_process_post", ProcessPostWorker.new(%{post_id: post["id"]}), deps: ["#{post["id"]}_save_post"])
end)
|> maybe_next_page(args, new_cursor, args.batch_id)
|> Batch.from_workflow(
  batch_id: args.batch_id, 
  callback_workflow: CompleteBatchWorker, 
  callback_opts: [args: %{term: args.term}]
)
|> Oban.insert_all()

def maybe_next_page(workflow, _args, _new_cursor = nil, _batch_id) do
  # Last page, no need to schedule another worker.
  workflow
end

def maybe_next_page(workflow, args, new_cursor, batch_id) do
  next_page_worker = new(%{args | cursor: new_cursor, batch_id: args.batch_id})
  Workflow.add(workflow, "#{new_cursor}-next-page", next_page_worker)
end

Note: I’m not entirely sure if the code above is correct, as I ended up collecting posts from all pages and process them later using Workflow + Batch.