nietaki
Workflow terminating due to dependency job(s) incorrectly reported as deleted (possibly due to partitioning?)
We’re working with a relatively involved setup, so I’ll try to simplify and keep it brief - details of our config are at the bottom of the post
I’ve built up a Workflow, where the first worker dynamically fans out to N children and adds a single “finisher” worker to handle any final tasks - let’s call them A, B_0..B_n, and Z
All workers are using Oban.Pro.Workers.Workflow, the A worker gets enqueued on its own and inside the process/1 callback appends the rest of the workers to its workflow based on the data it fetches - something like this (simplified code):
@impl true
def process(%Job{args: args} = job) do
b_jobs = generate_b_job_changesets()
workflow = append_workflow(job)
workflow =
Enum.reduce(b_jobs, workflow, fn {name, changeset}, workflow ->
add(workflow, name, changeset, deps: [@recording_extractor_job_name])
end)
b_job_names = Map.keys(chunk_jobs)
workflow =
add(workflow, :z, z_job_changeset(),
deps: b_job_names
)
{:ok, Oban.insert_all(workflow)}
end
The whole setup works as expected in E2E tests, but when run in a pre-production environment, the A worker completes successfully, but the B_x and Z workers fail with the error message of upstream deps deleted
I was hoping it was my mistake - messing up the workflow concatenation or dependency names, but when I fetch the job rows from the DB it all looks correct (some values redacted):
A job row:
id | 350316
state | completed
queue | fact_extraction_initialization
worker | Foo.Bar.A
attempt | 1
max_attempts | 5
priority | 0
args | {"redacted": "redacted"}
meta | {...}
attempted_by | {redacted}
errors | {}
tags | {redacted}
inserted_at | 2024-08-01 18:36:20.930824
scheduled_at | 2024-08-01 18:36:35.930513
attempted_at | 2024-08-01 18:36:36.592045
cancelled_at |
completed_at | 2024-08-01 18:36:36.631846
discarded_at |
One of B job rows:
id | 350317
state | cancelled
queue | fact_extraction
worker | Foo.Bar.B
attempt | 0
max_attempts | 5
priority | 0
args | {"redacted": "redacted"}
meta | {...}
attempted_by |
errors | {"{\"at\": \"2024-08-01T18:36:36.632222+00:00\", \"error\": \"upstream deps deleted\", \"attempt\": 0}"}
tags | {redacted}
inserted_at | 2024-08-01 18:36:36.627618
scheduled_at | 3000-01-01 00:00:00
attempted_at |
cancelled_at | 2024-08-01 18:36:36.632222
completed_at |
discarded_at |
If I’m reading this right the workflow IDs and dependency names align, so it should be working.
Any thoughts what’s happening here? I put a good amount of time implementing the feature with a workflow and I have very limited time to wrap it up right now.
Setup details:
$ mix hex.outdated | grep oban
oban 2.17.12 2.18.0 Update not possible
oban_pro 1.4.10 1.4.10 Up-to-date
oban_web 2.10.4 2.10.5 Update possible
(Most of) oban config:
config :redacted, Oban,
engine: Oban.Pro.Engines.Smart,
repo: Redacted.Repo,
notifier: Oban.Notifiers.PG,
peer: Oban.Peers.Global,
plugins: [
{Oban.Pro.Plugins.DynamicPartitioner,
schedule: if(Mix.env() == :prod, do: "* 1-7 * * *", else: "* * * * *"),
timeout: 60_000,
retention: [completed: 2, discarded: 30]},
{Oban.Plugins.Cron,
crontab: [redacted]},
Oban.Pro.Plugins.DynamicLifeline,
{Oban.Plugins.Reindexer, schedule: "40 4 * * *"}
],
My (uneducated) guess is that because we’re using Oban.Pro.Plugins.DynamicPartitioner, the B_x jobs were getting started when the A job was getting moved from the available/scheduled partition and into the completed partition, which is why it didn’t show up in the query and got interpreted as deleted.
I know if that’s the case I could theoretically use ignore_deleted: true, but that feels like just asking for trouble - failing jobs could also be misinterpreted as deleted when they’re moved to a discarded partition and the workflow would continue even though it shouldn’t - I was counting on stronger guarantees.
Plus IIRC we did introduce the DynamicPartitioner because the impact of oban job churn started to get really noticeable, I don’t think we can really get away from it…
Also, it might or might not be relevant, but the A worker is in a different queue than the B_x and Z workers - I assumed that wasn’t an issue because I couldn’t find anything indicating that in either the docs or the code itself.
Most Liked
sorentwo
Thanks for bringing that up, it’s an excellent insight that saved hours of an attempted reproduction.
That makes sense. The ignore_deleted flag would bypass the condition that cancelled your downstream dependency.
There are a few potential options now:
- Add a loud warning about the risks of using
uniquefor only some jobs in theWorkflowmodule docs (informative, but requires learning and won’t save anybody) - Automatically flag deps of a
uniquejob in the workflow withignore_deletedto compensate (zero effort, but potentially surprising) - Do Both. Automatically mark unique deps with
ignore_deletedand loudly document it.
sorentwo
Thanks for the thorough report. We’ll try to recreate the situation and see if there’s an edge case with partitioned tables here.
The queries to make features work with partitioned tables are trickier, but that’s on us. There’s no reason you should have to disable it!
cmo
I had an issue with workflows being cancelled and I’m pretty sure it was because I had one of the steps as unique but without any args to assign it to the specific instance of that workflow. My hypothesis is that the job would not get added because another workflow had it scheduled/running, which would cancel the whole thing (it was the first step).








