At-Most-Once Oban Job Execution; with possible scheduled job retraction; and possible followon job scheduling

Hi Oban experts, I’m wondering about how to properly use Oban jobs (scheduled in the future) to run (or not) jobs in the following scenario/constraints:

Context

I have a CQRS/ES Commanded aggregate whose lifecycle can emit events when some threshold is crossed:

  • Each aggregate stream is id’d with a business valid UUID.
  • Going over a threshold will emit an AlertAsserted (upward) event.
  • Going back under the threshold will emit an AlertRetracted (downward) event.

I am thinking to have a Commanded Event Handler subscribed to the Event Store to consume both above events, and react as necessary to manage Oban jobs for my goal.

I have Oban Pro, so IF paid features are required for my goal, then no problem.

Goal

I want a piece of work to execute 24 hours AFTER an AlertAsserted event AND if-and-only-if we are still AlertAsserted at that time (No AlertRetracted event produced); the work should not execute at all if we aren’t AlertAsserted; YET, there are possibilities to reschedule the work on subsequence AlertAsserts (after retractions).

Note that I am ok with the Eventual Consistency for when an AlertRetracted event fires right before (wall clock) the related job executes, yet the job is picked up by oban workers as the Event Handler consumes the event and attempts to process a retraction (removing scheduled job)… the fact that the job has already begun execution is not a problem for me… close enough.

For any given aggregate instance (by UUID):

  1. Initial Scheduling of Future Work: I would like a process (function) to execute scheduled 24 hours AFTER the AlertAsserted event fired (event timestamp)– i.e. I am thinking an Oban Job.
  2. Retracting Request of PENDING Future Work: But IF that aggregate instance emitted an AlertRetracted, BEFORE the process executes (while still scheduled), then I’d like the process to not run at all.
    1. Is that a job cancelation?
    2. Is that a job deletion?
  3. AT MOST ONCE Execution: I would like the future work to at most ever execute once to completion or cancelation.
    1. This is where I’d think the job will be unique by UUID.
    2. IF the aggregate instance crosses thresholds (downward) and emit AlertRetracted, but the job was picked up for execution (anything not scheduled), THEN I just want the job to finish to completion state.
    3. Additionally, IF then the aggregate crosses the threshold (upward) and emits an AlertAsserted again, no additional work will be scheduled because the Job Unique by UUID has already been executed (or in process of being executed).
  4. RESCHEDULING Execution: In the case where there is an AlertAsserted after AlertRetracted AND no job actually was executed, then I’d like the job to be scheduled (rescheduled) for running 24 hours after this most recent AlertAsserted event.

Ideas / Issues / Questions

I am assuming to use unique jobs state=:all keys=uuid period=:infinity to make sure that we would only ever have one job ever to be queued and run.

And since state=:canceled is a final state, that job won’t be able to be transitioned back into scheduled for future run… So I am to understand that maybe deleting only scheduled jobs be the way to handle retractions, while not clobbering finished job runs.

I don’t see Oban docs to delete “only if scheduled”, so would it be some kind of custom code like the following?

# PSEUDO CODE
def delete_if_scheduled(job_id) do
  Repo.transaction(fn ->
    case Repo.get(Job, job_id, lock: "FOR UPDATE") do
      %Job{state: "scheduled"} = job ->
        {:ok, _} = Repo.delete(job)
        {:deleted, job}

      %Job{} = job ->
        {:not_deleted, job.state}

      nil ->
        {:error, :not_found}
    end
  end)
end

The above should leave running/completed/canceled jobs alone… So if the retraction “fails”, meaning no-effect, the job is run… And then future assert event would do a job insert unique by state=:all, keys=UUID, and that the no-replace on conflict.

Is my train of thought the right way to approach the problem using Oban? Or does anyone else have any suggestions, or solutions to my constratins/goals?

Is there a better way to think about this?

Thanks!

1 Like

Additional Context: Aggregate lifecycle only has a month of real time before it is retired, no more alert events.

Follow-on idea as a solution I will explore:

Two jobs, in two steps:

  1. Delayed Trigger Job: Enqueued Unique by state :scheduled and args :uuid (default unique fields), scheduled to run in 24 hours; no replacement.
  2. Work Job: Enqueued to run immediately, unique by state :all and args :uuid and period :infinity; no replacement.

on AlertAsserted, enqueue the Delayed Trigger Job.

on AlertRetracted, find and cancel an existing scheduled Delayed Trigger Job trigger.

My Event Handler will process the Event Stream sequentially in order.

We’ll just get a bunch of canceled Delayed Trigger Jobs possibly living in Oban jobs history if the Alerts flip flop a bunch; These can get GC’d after a time, doesn’t have to live in history indefinitely.

if the Delayed Trigger Job is run, then the Work Job is enqueued…

Because the Work Job’s unique state configuration, it will only ever be enqueued once and run once– it also can get GC’d after a two months (from time of completion) to give plenty of time to guarantee that the related aggregate’s life will have been retired.

Subsequent Delayed Trigger Jobs MAY be enqueued, but even if they run, the actual Work Job will dedup to the already run Work Job.

On my phone so this will be brief.

Have you thought about using a process manager to solve this problem? The problem you’re describing seems to have a bit of business logic that you can solve in the process manager.

1 Like

Have you explored that solution yet? Something that may simplify your original plan is to use Job Deadlines to have it cancel automatically after 24h.