Oban.Pro.Worker after_process callback running before the job is completed?

I was trying to do a trick with Oban Pro, where I have some uniqueness constraints with only one job of the same “type” running at a time, and the after_process callback of the job would in some cases insert a subsequent job of the same “type”.

However, it looks like trying to insert the job in the callback conflicts with the freshly completed job that caused the callback.

Here’s the (simplified) code:

  def after_process(
        :complete,
        %Oban.Job{args: %mod{},
        _result
      ) do
    opts = [
      unique: [
        period: 30, #very short for experiments
        fields: [:worker, :queue],
        states: [:available, :scheduled, :executing, :retryable]
      ]
    ]
    job = mod.new(%{some: "args"})
    {:ok, job} = Oban.insert(job)
    IO.inspect job
    %{conflict?: false} = job

    :ok
  end

When I run it in its context the logs show the uniqueness conflict was triggered:

%Oban.Job{
  __meta__: #Ecto.Schema.Metadata<:built, "oban_jobs">,
  # many lines cut
  conflict?: true,
  replace: nil,
  unique: %{
    timestamp: :inserted_at,
    keys: [],
    period: 30,
    fields: [:worker, :queue],
    states: [:available, :scheduled, :executing, :retryable]
  },
  unsaved_error: nil
}

[error] [Oban.Pro.Worker] hook error: ** (MatchError) no match of right hand side value: %Oban.Job{__meta__: #Ecto.Schema.Metadata<:built, "oban_jobs">, id: 4....

When I remove :executing from the states list, the job does get inserted without errors.

Do I understand correctly that the hooks aren’t guaranteed to run after Oban sees the job as :complete?

Does that mean the “correct” way of going about what I’m trying to do is dynamically appending to the workflow the “current” job was a part of, as part of the job itself?

You’re correct, there isn’t a guarantee that the job is marked completed when the hook runs. That’s because acking is async and there’s a 0-5ms delay before it’s updated.

An easy way around this is to override the unique options when you’re inserting the next job from after_process so it doesn’t include the executing state:

new(args, unique: [states: states])

Or you can disable uniqueness entirely when inserting with unique: nil.