One way is to add a job at the end whose dependencies are all the jobs in the last step. Depending on your workflow, that could be a lot of dependencies .
Using Batch from Oban 1.5.0 should do the job. I can generate a custom batch ID and pass it to the next page. Adding a next page worker to the batch can prevent it from finishing too early if search_posts! is slower than processing.
posts
|> Enum.reduce(Workflow.new(), fn post, acc ->
acc
|> Workflow.add("#{post["id"]}_save_post", SavePostWorker.new(%{post_id: post["id"]}))
|> Workflow.add("#{post["id"]}_process_post", ProcessPostWorker.new(%{post_id: post["id"]}), deps: ["#{post["id"]}_save_post"])
end)
|> maybe_next_page(args, new_cursor, args.batch_id)
|> Batch.from_workflow(
batch_id: args.batch_id,
callback_workflow: CompleteBatchWorker,
callback_opts: [args: %{term: args.term}]
)
|> Oban.insert_all()
def maybe_next_page(workflow, _args, _new_cursor = nil, _batch_id) do
# Last page, no need to schedule another worker.
workflow
end
def maybe_next_page(workflow, args, new_cursor, batch_id) do
next_page_worker = new(%{args | cursor: new_cursor, batch_id: args.batch_id})
Workflow.add(workflow, "#{new_cursor}-next-page", next_page_worker)
end
Note: I’m not entirely sure if the code above is correct, as I ended up collecting posts from all pages and process them later using Workflow + Batch.