Oban: how to list/count executing jobs in the current node

How do I list or count Oban jobs which are currently executing, but only in the same node my code is running?
I’ve tried running something like:

Repo.all(
      from(job in Job,
        where: job.worker == ^MyWorker,
        where: job.state in ["executing"],
      )
    )

But that gives me all the executing jobs across all nodes, and I need to know which jobs are executing only on the current node.

Currently, our backend runs in a kubernetes cluster, and this is related to some graceful shutdown logic, so we need it to be node specific. We also have Oban Pro.

1 Like

oban web shows this. but if you still need it then the oban_jobs table has a attempted_by key. not quite sure what the value in there is though.

The value in there is something like {NODE_NAME , NODE_UUID}. Do you know how I can get both the NodeName and NodeUUID, so I can use it to search in the table? I’ve only found how to get the PID, not uuid of it.

The first element is the node name, but the second is the queue producer’s UUID. You don’t need the UUID to query for jobs from the current node.

Some of Oban Web’s filter queries show how you can query by the current node: oban_web/lib/oban/web/queries/job_query.ex at main · oban-bg/oban_web · GitHub. It’s essentially a dynamic containment operator:

dynamic([j], ^condition and fragment("?[1]", j.attempted_by) in ^nodes)

Here’s how you’d translate that into a general query for the current node:

# Use the running config to get the node name.
conf = Oban.config()

Oban.Job
|> where([j], j.state == "executing" and j.worker == MyWorker)
|> where([j], fragment("?[1]", j.attempted_at) == ^conf.node)
|> Repo.all()

Rather than building your own query, you could use Oban.Met.latest/3 to get the executing count:

%{"all" => count} = Oban.Met.latest(Oban, :exec_count, filters: [node: "my-node-name"])
5 Likes

Hey Soren!
Thanks a lot for the answer, I appreciate it.
The first option, with writing a query, worked very well!

# Use the running config to get the node name.
conf = Oban.config()

Oban.Job
|> where([j], j.state == "executing")
|> where([j], fragment("?[1]", j.attempted_by) == ^conf.node)
|> Repo.all()

For curiosity’s sake, using

Oban.Met.latest(Oban, :exec_count, filters: [node: "my-node-name"])

always gives me an empty map as a result.

However,

Oban.Met.latest(Oban, :full_count, filters: [node: "my-node-name", state: "executing"])

always gives the correct result! Why would that be?
Does it happen that a Job has the “executing” state, but it does not appear in the exec_count gauge?

Edit:
I think I understand it. It’s due to a misunderstanding of what the job state means. A Job might currently have the “executing” state, but not be running, in fact, it can mean that the job will execute in the future, is this correct? Or it might just be my database being inconsistent.

1 Like

That’s most likely what is happening. Jobs reported by exec_count have live processes running. It’s possible to have job records marked as executing in the database when the processes aren’t running anymore. Those are called orphaned jobs, and they typically happen during shutdown when jobs run longer than the queue shutdown period.

There’s more about that situation in the Ready for Production guide.

It doesn’t mean that the job may run in the future. Most likely, it was running in the past and the queue shut down.

1 Like