I want some thoughts on a way to run a job in a workflow whose job is to clean up resources. Basically I have a workflow that:
- Creates a certain resource on AWS.
- Run 3 operations in parallel
- Clean up the AWS resource.
My question if is there’s a built in way in Oban Pro Worflows to run a job even if one (or multiple) of it’s dependencies fail.
Yes! You can wrap the Workflow
in a Batch
with from_workflow.
Here is an example to show how to set a batch callback worker that will be called if dependencies fail (e.g., any are discarded):
Here’s the batch:
defmodule MyApp.CleanupWorker do
use Oban.Pro.Worker
@behaviour Oban.Pro.Batch
@impl Oban.Pro.Worker
def process(_job), do: :ok
@impl Oban.Pro.Batch
def batch_discarded(job) do|
# Handle your cleanup here
IO.inspect(job.args, label: "Discarded")
end
end
Here’s how you’d add it to your workflow:
workflow
|> Batch.from_workflow(callback_worker: MyApp.CleanupWorker)
|> Oban.insert_all()
1 Like
I think that’s exactly what I was looking for. One quick follow up question. My ClanupWorker
should run if all jobs succeeds or fail and I’m also graving some data from the initial worker that spin up the resources. Is it possible to do something like:
def process(job) do
resources = Workflow.get_recorded(job, :create_resources)
# Clean up resources
end
def batch_discarded(job) do
process(job)
end
Great follow up. In that case, you want to use handle_exhausted.
You could do that, but the process/1
function won’t ever get called. SO, put your cleanup code in the batch_discarded/1
function itself.
no misdirection.
2 Likes
Coming back to this. Is there a way to use the workflow heartbeat to do something custom in my system? I basically need to send a heartbeat to an upstream system while the oban workflow is running
Is the goal to send messages every 30 seconds or so while jobs are running or do you want to send a message when the workflow starts / stops? You can accomplish either with worker hooks.
My goal is to do the first thing
You have chosen door number 1. In that case, use the before_process hook to spawn a process when the job starts. The process is linked to the job, so it will stop when the job process exits. Kinda like this below:
@impl Oban.Pro.Worker
def before_process(job) do
job_pid = self()
Task.start(fn ->
# Link to job process so it stops on exit
Process.link(job_pid)
# Send a message to the right process, maybe use pubsub
ping = fn _ -> send(some_pid, :ping) end
# Initial ping until next interval
ping.(nil)
# Call the ping function every 30s until the job exits
30_000
|> Stream.interval()
|> Stream.each(ping)
|> Stream.run()
end)
end
1 Like