nietaki

nietaki

Workflow terminating due to dependency job(s) incorrectly reported as deleted (possibly due to partitioning?)

We’re working with a relatively involved setup, so I’ll try to simplify and keep it brief - details of our config are at the bottom of the post

I’ve built up a Workflow, where the first worker dynamically fans out to N children and adds a single “finisher” worker to handle any final tasks - let’s call them A, B_0..B_n, and Z

All workers are using Oban.Pro.Workers.Workflow, the A worker gets enqueued on its own and inside the process/1 callback appends the rest of the workers to its workflow based on the data it fetches - something like this (simplified code):

  @impl true
  def process(%Job{args: args} = job) do
    b_jobs = generate_b_job_changesets()

    workflow = append_workflow(job)

    workflow =
      Enum.reduce(b_jobs, workflow, fn {name, changeset}, workflow ->
        add(workflow, name, changeset, deps: [@recording_extractor_job_name])
      end)

    b_job_names = Map.keys(chunk_jobs)

    workflow =
      add(workflow, :z, z_job_changeset(),
        deps: b_job_names
      )

    {:ok, Oban.insert_all(workflow)}
  end

The whole setup works as expected in E2E tests, but when run in a pre-production environment, the A worker completes successfully, but the B_x and Z workers fail with the error message of upstream deps deleted

I was hoping it was my mistake - messing up the workflow concatenation or dependency names, but when I fetch the job rows from the DB it all looks correct (some values redacted):

A job row:

id           | 350316
state        | completed
queue        | fact_extraction_initialization
worker       | Foo.Bar.A
attempt      | 1
max_attempts | 5
priority     | 0
args         | {"redacted": "redacted"}
meta         | {...}
attempted_by | {redacted}
errors       | {}
tags         | {redacted}
inserted_at  | 2024-08-01 18:36:20.930824
scheduled_at | 2024-08-01 18:36:35.930513
attempted_at | 2024-08-01 18:36:36.592045
cancelled_at |
completed_at | 2024-08-01 18:36:36.631846
discarded_at |

One of B job rows:

id           | 350317
state        | cancelled
queue        | fact_extraction
worker       | Foo.Bar.B
attempt      | 0
max_attempts | 5
priority     | 0
args         | {"redacted": "redacted"}
meta         | {...}
attempted_by |
errors       | {"{\"at\": \"2024-08-01T18:36:36.632222+00:00\", \"error\": \"upstream deps deleted\", \"attempt\": 0}"}
tags         | {redacted}
inserted_at  | 2024-08-01 18:36:36.627618
scheduled_at | 3000-01-01 00:00:00
attempted_at |
cancelled_at | 2024-08-01 18:36:36.632222
completed_at |
discarded_at |

If I’m reading this right the workflow IDs and dependency names align, so it should be working.

Any thoughts what’s happening here? I put a good amount of time implementing the feature with a workflow and I have very limited time to wrap it up right now.


Setup details:

$ mix hex.outdated | grep oban
oban                          2.17.12    2.18.0     Update not possible
oban_pro                      1.4.10     1.4.10     Up-to-date
oban_web                      2.10.4     2.10.5     Update possible

(Most of) oban config:

config :redacted, Oban,
  engine: Oban.Pro.Engines.Smart,
  repo: Redacted.Repo,
  notifier: Oban.Notifiers.PG,
  peer: Oban.Peers.Global,
  plugins: [
    {Oban.Pro.Plugins.DynamicPartitioner,
     schedule: if(Mix.env() == :prod, do: "* 1-7 * * *", else: "* * * * *"),
     timeout: 60_000,
     retention: [completed: 2, discarded: 30]},
    {Oban.Plugins.Cron,
     crontab: [redacted]},
    Oban.Pro.Plugins.DynamicLifeline,
    {Oban.Plugins.Reindexer, schedule: "40 4 * * *"}
  ],

My (uneducated) guess is that because we’re using Oban.Pro.Plugins.DynamicPartitioner, the B_x jobs were getting started when the A job was getting moved from the available/scheduled partition and into the completed partition, which is why it didn’t show up in the query and got interpreted as deleted.

I know if that’s the case I could theoretically use ignore_deleted: true, but that feels like just asking for trouble - failing jobs could also be misinterpreted as deleted when they’re moved to a discarded partition and the workflow would continue even though it shouldn’t - I was counting on stronger guarantees.

Plus IIRC we did introduce the DynamicPartitioner because the impact of oban job churn started to get really noticeable, I don’t think we can really get away from it…


Also, it might or might not be relevant, but the A worker is in a different queue than the B_x and Z workers - I assumed that wasn’t an issue because I couldn’t find anything indicating that in either the docs or the code itself.

Most Liked

sorentwo

sorentwo

Oban Core Team

Thanks for bringing that up, it’s an excellent insight that saved hours of an attempted reproduction.

That makes sense. The ignore_deleted flag would bypass the condition that cancelled your downstream dependency.

There are a few potential options now:

  1. Add a loud warning about the risks of using unique for only some jobs in the Workflow module docs (informative, but requires learning and won’t save anybody)
  2. Automatically flag deps of a unique job in the workflow with ignore_deleted to compensate (zero effort, but potentially surprising)
  3. Do Both. Automatically mark unique deps with ignore_deleted and loudly document it.
sorentwo

sorentwo

Oban Core Team

Thanks for the thorough report. We’ll try to recreate the situation and see if there’s an edge case with partitioned tables here.

The queries to make features work with partitioned tables are trickier, but that’s on us. There’s no reason you should have to disable it!

cmo

cmo

I had an issue with workflows being cancelled and I’m pretty sure it was because I had one of the steps as unique but without any args to assign it to the specific instance of that workflow. My hypothesis is that the job would not get added because another workflow had it scheduled/running, which would cancel the whole thing (it was the first step).

Where Next?

Popular in Questions Top

fireproofsocks
I’m working on defining a simple Ecto schema for a table (in PostGres), but I don’t see where I can define a column as NOT NULL. Conside...
New
Harrisonl
We have an ECS cluster with 4 services, where each task joins a single cluster, via discovery ECS discovery service. Currently when I de...
New
chokchit
** (DBConnection.ConnectionError) connection not available and request was dropped from queue after 2733ms. You can configure how long re...
New
shahryarjb
Hello, I get Persian date from my client and convert it to normal calendar like this: def jalali_string_to_miladi_english_number(persi...
New
senggen
Erlang/OTP 25 [erts-13.2.2] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] 15:22:35.803 [error] gen_event {lager_file_backend...
New
script
If I have a string “1000 cfu/ml” . I want to remove the characters and / and space . So the string is like this "1000" What is the ...
New
vegabook
I’m brand new to Phoenix and I have stripped one of the demo applications to the bone. I just want to get an svg up on the screen. Here i...
New
nsuchy
Hi. I’ve noticed that Windows Powershell has it’s own IEX command and you cannot access Elixir’s IEX due to the conflict. This isn’t a cr...
New
srinivasu
How to handle excepions in elixir? Suppose i have A, B, C ,D, E modules. and each module has get() function. A.get() method will call t...
New
vonH
In asking this question I am more interested about the expressiveness of the language itself and less concerned about the availability of...
New

Other popular topics Top

danschultzer
None of the current solutions worked well for me, so I went ahead and built a user management system from scratch. This project took far...
548 29377 241
New
mcarvalho
What is the difference between System.get_env and Application.get_env? For example, what are best practices to use one versus another.
New
stefanchrobot
What’s the safe way to decode a JSON string into a struct? I want to avoid calling String.to_atom. Jason.decode can give me a map with st...
New
fayddelight
I tried installing elixir 1.11.2 erlang 23.3.4 via asdf in my zsh shell. Enabled the versions locally and globally. When I list them ...
New
baxterw3b
Hi guys, i’m new in the Elixir world, and i have to say, that i love it! i’m having some problem to understand anonymous functions with ...
New
jason.o
In the code below, if the create action is not set to accept “extra_key” as an input, it errors out with a message shown above. Is there ...
New
saif
Hello everyone, Long time lurker first time poster here. I’ve recently begun working on Elixir full-time again! :raised_hands: It’s been...
New
romenigld
I am trying to run a deploy with docker and I successfully runned with this command: docker build -t romenigld/blog-prod . but when I t...
New
Qqwy
Update: How to use the Blogs & Podcasts section You can post links to your blog posts or podcasts either in one of the Official Blog...
3271 126479 1222
New
svb
Hi! Currently I want to submit a form by pressing the Enter key. However, since my input field is of type “textarea” this is just adds a...
New

We're in Beta

About us Mission Statement