Workflow terminating due to dependency job(s) incorrectly reported as deleted (possibly due to partitioning?)

We’re working with a relatively involved setup, so I’ll try to simplify and keep it brief - details of our config are at the bottom of the post

I’ve built up a Workflow, where the first worker dynamically fans out to N children and adds a single “finisher” worker to handle any final tasks - let’s call them A, B_0…B_n, and Z

All workers are using Oban.Pro.Workers.Workflow, the A worker gets enqueued on its own and inside the process/1 callback appends the rest of the workers to its workflow based on the data it fetches - something like this (simplified code):

  @impl true
  def process(%Job{args: args} = job) do
    b_jobs = generate_b_job_changesets()

    workflow = append_workflow(job)

    workflow =
      Enum.reduce(b_jobs, workflow, fn {name, changeset}, workflow ->
        add(workflow, name, changeset, deps: [@recording_extractor_job_name])
      end)

    b_job_names = Map.keys(chunk_jobs)

    workflow =
      add(workflow, :z, z_job_changeset(),
        deps: b_job_names
      )

    {:ok, Oban.insert_all(workflow)}
  end

The whole setup works as expected in E2E tests, but when run in a pre-production environment, the A worker completes successfully, but the B_x and Z workers fail with the error message of upstream deps deleted

I was hoping it was my mistake - messing up the workflow concatenation or dependency names, but when I fetch the job rows from the DB it all looks correct (some values redacted):

A job row:

id           | 350316
state        | completed
queue        | fact_extraction_initialization
worker       | Foo.Bar.A
attempt      | 1
max_attempts | 5
priority     | 0
args         | {"redacted": "redacted"}
meta         | {...}
attempted_by | {redacted}
errors       | {}
tags         | {redacted}
inserted_at  | 2024-08-01 18:36:20.930824
scheduled_at | 2024-08-01 18:36:35.930513
attempted_at | 2024-08-01 18:36:36.592045
cancelled_at |
completed_at | 2024-08-01 18:36:36.631846
discarded_at |

One of B job rows:

id           | 350317
state        | cancelled
queue        | fact_extraction
worker       | Foo.Bar.B
attempt      | 0
max_attempts | 5
priority     | 0
args         | {"redacted": "redacted"}
meta         | {...}
attempted_by |
errors       | {"{\"at\": \"2024-08-01T18:36:36.632222+00:00\", \"error\": \"upstream deps deleted\", \"attempt\": 0}"}
tags         | {redacted}
inserted_at  | 2024-08-01 18:36:36.627618
scheduled_at | 3000-01-01 00:00:00
attempted_at |
cancelled_at | 2024-08-01 18:36:36.632222
completed_at |
discarded_at |

If I’m reading this right the workflow IDs and dependency names align, so it should be working.

Any thoughts what’s happening here? I put a good amount of time implementing the feature with a workflow and I have very limited time to wrap it up right now.


Setup details:

$ mix hex.outdated | grep oban
oban                          2.17.12    2.18.0     Update not possible
oban_pro                      1.4.10     1.4.10     Up-to-date
oban_web                      2.10.4     2.10.5     Update possible

(Most of) oban config:

config :redacted, Oban,
  engine: Oban.Pro.Engines.Smart,
  repo: Redacted.Repo,
  notifier: Oban.Notifiers.PG,
  peer: Oban.Peers.Global,
  plugins: [
    {Oban.Pro.Plugins.DynamicPartitioner,
     schedule: if(Mix.env() == :prod, do: "* 1-7 * * *", else: "* * * * *"),
     timeout: 60_000,
     retention: [completed: 2, discarded: 30]},
    {Oban.Plugins.Cron,
     crontab: [redacted]},
    Oban.Pro.Plugins.DynamicLifeline,
    {Oban.Plugins.Reindexer, schedule: "40 4 * * *"}
  ],

My (uneducated) guess is that because we’re using Oban.Pro.Plugins.DynamicPartitioner, the B_x jobs were getting started when the A job was getting moved from the available/scheduled partition and into the completed partition, which is why it didn’t show up in the query and got interpreted as deleted.

I know if that’s the case I could theoretically use ignore_deleted: true, but that feels like just asking for trouble - failing jobs could also be misinterpreted as deleted when they’re moved to a discarded partition and the workflow would continue even though it shouldn’t - I was counting on stronger guarantees.

Plus IIRC we did introduce the DynamicPartitioner because the impact of oban job churn started to get really noticeable, I don’t think we can really get away from it…


Also, it might or might not be relevant, but the A worker is in a different queue than the B_x and Z workers - I assumed that wasn’t an issue because I couldn’t find anything indicating that in either the docs or the code itself.

2 Likes

Thanks for the thorough report. We’ll try to recreate the situation and see if there’s an edge case with partitioned tables here.

The queries to make features work with partitioned tables are trickier, but that’s on us. There’s no reason you should have to disable it!

1 Like

I had an issue with workflows being cancelled and I’m pretty sure it was because I had one of the steps as unique but without any args to assign it to the specific instance of that workflow. My hypothesis is that the job would not get added because another workflow had it scheduled/running, which would cancel the whole thing (it was the first step).

1 Like

Thank you for bringing this up, @cmo !

In my case the the A step is actually unique with mostly default configuration:

  use Oban.Pro.Workers.Workflow,
    queue: :fact_extraction_initialization,
    max_attempts: 5,
    unique: [
      period: 15,
      states: [:available, :scheduled, :retryable, :executing]
    ]

I’ll have a look if anything about this part looks suspicious, but this is the part of the workflow that completes successfully, is tagged with the workflow and I actually have some tests that verify if the behaviour based on the uniqueness is as expected…

Here’s the workaround I came up with for now:

  @spec verify_workflow_dependencies(Job.t()) ::
          :ok | {:error, {:dependencies, [job_state() | :deleted | String.t()]}}
  def verify_workflow_dependencies(%Job{meta: %{"workflow" => true, "deps" => dep_names}} = job) do
    deps_by_name =
      job
      |> Workflow.all_jobs(only_deps: true)
      |> Map.new(fn %Job{meta: %{"name" => name}} = job -> {name, job} end)

    dep_states =
      dep_names
      |> Stream.map(&Map.get(deps_by_name, &1))
      |> Enum.map(&dep_state/1)
      |> Enum.uniq()
      |> Enum.sort()

    case dep_states do
      [] ->
        :ok

      [:completed] ->
        :ok

      other when is_list(other) ->
        unexpected_states = other -- [:completed]

        Logger.warning(
          "unexpected workflow dependency job states #{inspect(unexpected_states)}, in worker #{job.worker}, job id: #{job.id}"
        )

        {:error, {:dependencies, unexpected_states}}
    end
  end

  def verify_workflow_dependencies(%Job{meta: meta}) when not is_map_key(meta, "workflow") do
    :ok
  end

  @spec dep_state(Job.t() | nil) :: job_state() | :deleted | String.t()
  defp dep_state(nil), do: :deleted

  defp dep_state(%Job{state: state}) do
    case state do
      recognised when recognised in @state_strings ->
        String.to_existing_atom(recognised)

      unrecognised ->
        unrecognised
    end
  end

To use it, I configure the workflow with ignore_deleted: true and have every worker in the workflow call and check verify_workflow_dependencies in a with.

Interestingly, ever since I added the ignore_deleted: true, I couldn’t repro the issue anymore. But if it does happen and we’re using the verify_workflow_dependencies, we should be aware of it.


I hope this is just a temporary solution, especially since it does pull extra info from the DB in each of the workers. But I think it should hold for now and possibly provide more data to what’s happening here to help you guys debug

1 Like

Thanks for bringing that up, it’s an excellent insight that saved hours of an attempted reproduction.

That makes sense. The ignore_deleted flag would bypass the condition that cancelled your downstream dependency.

There are a few potential options now:

  1. Add a loud warning about the risks of using unique for only some jobs in the Workflow module docs (informative, but requires learning and won’t save anybody)
  2. Automatically flag deps of a unique job in the workflow with ignore_deleted to compensate (zero effort, but potentially surprising)
  3. Do Both. Automatically mark unique deps with ignore_deleted and loudly document it.
2 Likes

That’s interesting and very useful, thank you for listing those out :purple_heart:

I think I’m still missing some understanding of how Oban works internally to have an opinion or understand what the underying issue is

Do you think you could try to explain why making all jobs in the workflow unique could be a solution and if/why the concern below is unfounded?

I know I can implement either of your suggestions in my particular case and just have it work, but it could be very helpful for me (and possibly others) to have a better intuition what happens behind the scenes or what the tradeoffs could be.

If that translates to some future documentation anyways, it might not be totally wasted effort - and I’d be happy to be the test user helping you make sure the point gets across :smiley:

If you mark all the jobs in a workflow unique, and inserting all of them is skipped tue to a unique conflict, then none of the workflow will be inserted. That would avoid the partial workflow situation you’re seeing, but at the expense of having no workflow at all.

It’s not a very good solution because it’s surprising and it requires all of the jobs to have unique conflicts at the same time. That’s unlikely with different unique configs, different job states, retries, cancellations, and other parts of a typical busy system.

Your initial statement about “misinterpreted as deleted” wasn’t quite right.

Jobs that are discarded aren’t the same as deleted, which is why there is also ignore_discarded. In this case, deleted really means “missing”, and it only applies when the dep can’t be found in the database. As far as Oban can tell, there’s no difference between a job that wasn’t inserted due to unique conflict and a job being deleted.

My solution of setting ignore_deleted on unique deps would work around that and still allow the downstream job to run. However, it’s not ideal.

I believe the optimal solutions in your case are:

  1. Don’t use unique for workflow jobs.
  2. If you must use unique, then insert a single unique job that itself enqueues the rest of the workflow.
defmodule MyFlow do
  use Oban.Pro.Worker, unique: [period: 15, states: [:available, :scheduled, :retryable, :executing]]

  args_schema do
    field :init, :boolean, default: false
  end

  @impl true
  def process(%{args: %{init: true}}) do
    # build the workflow here,
    # use unique: false to disable uniqueness for this job
  end

  def process(_job) do
    # normal workflow logic here
  end
end
1 Like

Thanks for the reply and taking all the time you already did.

I’m going to push back a bit - I promise I’m not trying to be argumentative, just want to make sure we’re on the same page. And I do hope that if we’re not, the reason is that I’m missing something key here :smiley:


I 100% see your point about why all jobs being unique not being the best approach.

I also understand and like the approach with the init job being the only thing with any uniqueness constraints and afterwards re-inserting itself as part of the workflow without any uniqueness constraints - that simplifies things in my head too


Here’s the main thing I’m struggling with:

I still don’t think there was a uniqueness conflict in my original case.

You can see in the Postgres output the “A” job (the initializing one, the one with the unique constraints on it) is marked as state: completed - here’s an unedited query for all the jobs sharing the same workflow_id, with results:

=> select id, state, meta, errors, inserted_at, scheduled_at, attempted_at, cancelled_at, completed_at, discarded_at from oban_jobs where meta->>'workflow_id' = '01910f38-f4b5-7624-aa32-e65a5fda64e2' order by inserted_at asc;
-[ RECORD 1 ]+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id           | 350306
state        | completed
meta         | {"deps": [], "name": "recording_fact_extractor", "on_hold": false, "uniq_key": 9732866, "workflow": true, "structured": true, "workflow_id": "01910f38-f4b5-7624-aa32-e65a5fda64e2", "orig_scheduled_at": 1722537291597296}
errors       | {}
inserted_at  | 2024-08-01 18:34:36.597559
scheduled_at | 2024-08-01 18:34:51.597296
attempted_at | 2024-08-01 18:34:51.854252
cancelled_at |
completed_at | 2024-08-01 18:34:51.908564
discarded_at |
-[ RECORD 2 ]+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id           | 350307
state        | cancelled
meta         | {"deps": ["recording_fact_extractor"], "name": "ccfe_ff1f97b5-de4f-45f8-882b-b259b1c6048a_chapter_1_chunk_0", "on_hold": true, "workflow": true, "structured": true, "workflow_id": "01910f38-f4b5-7624-aa32-e65a5fda64e2"}
errors       | {"{\"at\": \"2024-08-01T18:34:51.908995+00:00\", \"error\": \"upstream deps deleted\", \"attempt\": 0}"}
inserted_at  | 2024-08-01 18:34:51.902028
scheduled_at | 3000-01-01 00:00:00
attempted_at |
cancelled_at | 2024-08-01 18:34:51.908995
completed_at |
discarded_at |
-[ RECORD 3 ]+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id           | 350310
state        | cancelled
meta         | {"deps": ["ccfe_ff1f97b5-de4f-45f8-882b-b259b1c6048a_chapter_1_chunk_0", "ccfe_ff1f97b5-de4f-45f8-882b-b259b1c6048a_chapter_2_chunk_1", "ccfe_ff1f97b5-de4f-45f8-882b-b259b1c6048a_chapter_3_chunk_2"], "name": "fact_extraction_finisher", "on_hold": true, "workflow": true, "structured": true, "workflow_id": "01910f38-f4b5-7624-aa32-e65a5fda64e2"}
errors       | {"{\"at\": \"2024-08-01T18:34:51.908995+00:00\", \"error\": \"upstream deps deleted\", \"attempt\": 0}"}
inserted_at  | 2024-08-01 18:34:51.902028
scheduled_at | 3000-01-01 00:00:00
attempted_at |
cancelled_at | 2024-08-01 18:34:51.908995
completed_at |
discarded_at |
-[ RECORD 4 ]+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id           | 350308
state        | cancelled
meta         | {"deps": ["recording_fact_extractor"], "name": "ccfe_ff1f97b5-de4f-45f8-882b-b259b1c6048a_chapter_2_chunk_1", "on_hold": true, "workflow": true, "structured": true, "workflow_id": "01910f38-f4b5-7624-aa32-e65a5fda64e2"}
errors       | {"{\"at\": \"2024-08-01T18:34:51.908995+00:00\", \"error\": \"upstream deps deleted\", \"attempt\": 0}"}
inserted_at  | 2024-08-01 18:34:51.902028
scheduled_at | 3000-01-01 00:00:00
attempted_at |
cancelled_at | 2024-08-01 18:34:51.908995
completed_at |
discarded_at |
-[ RECORD 5 ]+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id           | 350309
state        | cancelled
meta         | {"deps": ["recording_fact_extractor"], "name": "ccfe_ff1f97b5-de4f-45f8-882b-b259b1c6048a_chapter_3_chunk_2", "on_hold": true, "workflow": true, "structured": true, "workflow_id": "01910f38-f4b5-7624-aa32-e65a5fda64e2"}
errors       | {"{\"at\": \"2024-08-01T18:34:51.908995+00:00\", \"error\": \"upstream deps deleted\", \"attempt\": 0}"}
inserted_at  | 2024-08-01 18:34:51.902028
scheduled_at | 3000-01-01 00:00:00
attempted_at |
cancelled_at | 2024-08-01 18:34:51.908995
completed_at |
discarded_at |

It would make sense if whole workflow got cancelled due to a uniqueness conflict in the first job, but I don’t think that’s the case here. The first job (350306 - recording_fact_extractor) got scheduled for “in 15 seconds” and completed successfuly within 100ms.

It seems like it still didn’t get found in the DB, which caused the downstream jobs to get cancelled

This is what causes my reservations about using ignore_deleted, regardless of any uniqueness configurations. If the initial job didn’t get picked up by the query and got interpreted as deleted, what’s to stop other cases to have a similar result?

If the dep job gets cancelled or discarded due to errors, if the query that “triggers” the dependent jobs doesn’t pick it up (due to whatever reason caused the above), it could let the workflow move forward in cases where it shouldn’t

Are the unique jobs processed in some special way that even if they complete successfully they might not get picked up by the query, whereas if they weren’t unique the situation wouldn’t happen altogether?

If that’s the case (and that’s the detail I was missing), I can happily use the solution from the last snippet you posted :smiley:


For full context, there were some prior jobs in the DB that would have conflicted with 350306, if they weren’t all discarded already.

(once again, thank you for taking the time to debug that with me, it feels much better to not have to rely on just my own understanding on how it works, which might very well be off in places)

edit: I’ll be on the road very soon, so forgive any delayed responses - I should be back to being chronically online within 10 days :wink:

That job wasn’t completed in another instance of the workflow?

I’m not sold on the idea of using ignore_* here. Mainly because the job I needed was a recorded job that the next jobs in line needed to fetch the result of. I really don’t want two copies of the workflow running do need the unique. I generally use unique with the condition of it being a state that isn’t waiting to run or currently running but allow it to be run again if completed. If you had unique with that condition on the first job in line whose role was to create the rest of the workflow, that would be completed rather quickly (after it created the rest of the jobs) and then you could create the workflow again and have another set of all the other job running? I guess you could not have a state condition on unique and use another way of allowing it to be retried/rerun manually.

Maybe workflows need their own table or to be a job in of themselves. Now that I think about the above more, maybe this is what you’re actually suggesting. I added a job at the end just to log that the workflow completed.

I feel like your use-case might be a bit different than mine and I wouldn’t want to derail the thread by turning it into a discussion about the different ways one can approach architecting workflows (although I’d be happy to take part in one in a different topic once I’m back at the computer).

I imagine Oban is flexible and powerful enough that there’s probably multiple good ways of obtaining the result you’re after and a workflow might look very different depending on what properties you need from it

What I’m really after is the understanding of what guarantees I can depend on in the context of a dependency relationship between two jobs in a workflow - the one I had so far wasn’t entirely accurate. Once I have that I’m sure I’ll be fine and dandy with whatever I come up with :slight_smile: