We’re working with a relatively involved setup, so I’ll try to simplify and keep it brief - details of our config are at the bottom of the post
I’ve built up a Workflow, where the first worker dynamically fans out to N children and adds a single “finisher” worker to handle any final tasks - let’s call them A, B_0…B_n, and Z
All workers are using Oban.Pro.Workers.Workflow
, the A worker gets enqueued on its own and inside the process/1
callback appends the rest of the workers to its workflow based on the data it fetches - something like this (simplified code):
@impl true
def process(%Job{args: args} = job) do
b_jobs = generate_b_job_changesets()
workflow = append_workflow(job)
workflow =
Enum.reduce(b_jobs, workflow, fn {name, changeset}, workflow ->
add(workflow, name, changeset, deps: [@recording_extractor_job_name])
end)
b_job_names = Map.keys(chunk_jobs)
workflow =
add(workflow, :z, z_job_changeset(),
deps: b_job_names
)
{:ok, Oban.insert_all(workflow)}
end
The whole setup works as expected in E2E tests, but when run in a pre-production environment, the A worker completes successfully, but the B_x and Z workers fail with the error message of upstream deps deleted
I was hoping it was my mistake - messing up the workflow concatenation or dependency names, but when I fetch the job rows from the DB it all looks correct (some values redacted):
A job row:
id | 350316
state | completed
queue | fact_extraction_initialization
worker | Foo.Bar.A
attempt | 1
max_attempts | 5
priority | 0
args | {"redacted": "redacted"}
meta | {...}
attempted_by | {redacted}
errors | {}
tags | {redacted}
inserted_at | 2024-08-01 18:36:20.930824
scheduled_at | 2024-08-01 18:36:35.930513
attempted_at | 2024-08-01 18:36:36.592045
cancelled_at |
completed_at | 2024-08-01 18:36:36.631846
discarded_at |
One of B job rows:
id | 350317
state | cancelled
queue | fact_extraction
worker | Foo.Bar.B
attempt | 0
max_attempts | 5
priority | 0
args | {"redacted": "redacted"}
meta | {...}
attempted_by |
errors | {"{\"at\": \"2024-08-01T18:36:36.632222+00:00\", \"error\": \"upstream deps deleted\", \"attempt\": 0}"}
tags | {redacted}
inserted_at | 2024-08-01 18:36:36.627618
scheduled_at | 3000-01-01 00:00:00
attempted_at |
cancelled_at | 2024-08-01 18:36:36.632222
completed_at |
discarded_at |
If I’m reading this right the workflow IDs and dependency names align, so it should be working.
Any thoughts what’s happening here? I put a good amount of time implementing the feature with a workflow and I have very limited time to wrap it up right now.
Setup details:
$ mix hex.outdated | grep oban
oban 2.17.12 2.18.0 Update not possible
oban_pro 1.4.10 1.4.10 Up-to-date
oban_web 2.10.4 2.10.5 Update possible
(Most of) oban config:
config :redacted, Oban,
engine: Oban.Pro.Engines.Smart,
repo: Redacted.Repo,
notifier: Oban.Notifiers.PG,
peer: Oban.Peers.Global,
plugins: [
{Oban.Pro.Plugins.DynamicPartitioner,
schedule: if(Mix.env() == :prod, do: "* 1-7 * * *", else: "* * * * *"),
timeout: 60_000,
retention: [completed: 2, discarded: 30]},
{Oban.Plugins.Cron,
crontab: [redacted]},
Oban.Pro.Plugins.DynamicLifeline,
{Oban.Plugins.Reindexer, schedule: "40 4 * * *"}
],
My (uneducated) guess is that because we’re using Oban.Pro.Plugins.DynamicPartitioner
, the B_x jobs were getting started when the A job was getting moved from the available/scheduled partition and into the completed
partition, which is why it didn’t show up in the query and got interpreted as deleted
.
I know if that’s the case I could theoretically use ignore_deleted: true
, but that feels like just asking for trouble - failing jobs could also be misinterpreted as deleted
when they’re moved to a discarded
partition and the workflow would continue even though it shouldn’t - I was counting on stronger guarantees.
Plus IIRC we did introduce the DynamicPartitioner
because the impact of oban job churn started to get really noticeable, I don’t think we can really get away from it…
Also, it might or might not be relevant, but the A worker is in a different queue than the B_x and Z workers - I assumed that wasn’t an issue because I couldn’t find anything indicating that in either the docs or the code itself.