At-Most-Once Oban Job Execution; with possible scheduled job retraction; and possible followon job scheduling

Hi Oban experts, I’m wondering about how to properly use Oban jobs (scheduled in the future) to run (or not) jobs in the following scenario/constraints:

Context

I have a CQRS/ES Commanded aggregate whose lifecycle can emit events when some threshold is crossed:

  • Each aggregate stream is id’d with a business valid UUID.
  • Going over a threshold will emit an AlertAsserted (upward) event.
  • Going back under the threshold will emit an AlertRetracted (downward) event.

I am thinking to have a Commanded Event Handler subscribed to the Event Store to consume both above events, and react as necessary to manage Oban jobs for my goal.

I have Oban Pro, so IF paid features are required for my goal, then no problem.

Goal

I want a piece of work to execute 24 hours AFTER an AlertAsserted event AND if-and-only-if we are still AlertAsserted at that time (No AlertRetracted event produced); the work should not execute at all if we aren’t AlertAsserted; YET, there are possibilities to reschedule the work on subsequence AlertAsserts (after retractions).

Note that I am ok with the Eventual Consistency for when an AlertRetracted event fires right before (wall clock) the related job executes, yet the job is picked up by oban workers as the Event Handler consumes the event and attempts to process a retraction (removing scheduled job)… the fact that the job has already begun execution is not a problem for me… close enough.

For any given aggregate instance (by UUID):

  1. Initial Scheduling of Future Work: I would like a process (function) to execute scheduled 24 hours AFTER the AlertAsserted event fired (event timestamp)– i.e. I am thinking an Oban Job.
  2. Retracting Request of PENDING Future Work: But IF that aggregate instance emitted an AlertRetracted, BEFORE the process executes (while still scheduled), then I’d like the process to not run at all.
    1. Is that a job cancelation?
    2. Is that a job deletion?
  3. AT MOST ONCE Execution: I would like the future work to at most ever execute once to completion or cancelation.
    1. This is where I’d think the job will be unique by UUID.
    2. IF the aggregate instance crosses thresholds (downward) and emit AlertRetracted, but the job was picked up for execution (anything not scheduled), THEN I just want the job to finish to completion state.
    3. Additionally, IF then the aggregate crosses the threshold (upward) and emits an AlertAsserted again, no additional work will be scheduled because the Job Unique by UUID has already been executed (or in process of being executed).
  4. RESCHEDULING Execution: In the case where there is an AlertAsserted after AlertRetracted AND no job actually was executed, then I’d like the job to be scheduled (rescheduled) for running 24 hours after this most recent AlertAsserted event.

Ideas / Issues / Questions

I am assuming to use unique jobs state=:all keys=uuid period=:infinity to make sure that we would only ever have one job ever to be queued and run.

And since state=:canceled is a final state, that job won’t be able to be transitioned back into scheduled for future run… So I am to understand that maybe deleting only scheduled jobs be the way to handle retractions, while not clobbering finished job runs.

I don’t see Oban docs to delete “only if scheduled”, so would it be some kind of custom code like the following?

# PSEUDO CODE
def delete_if_scheduled(job_id) do
  Repo.transaction(fn ->
    case Repo.get(Job, job_id, lock: "FOR UPDATE") do
      %Job{state: "scheduled"} = job ->
        {:ok, _} = Repo.delete(job)
        {:deleted, job}

      %Job{} = job ->
        {:not_deleted, job.state}

      nil ->
        {:error, :not_found}
    end
  end)
end

The above should leave running/completed/canceled jobs alone… So if the retraction “fails”, meaning no-effect, the job is run… And then future assert event would do a job insert unique by state=:all, keys=UUID, and that the no-replace on conflict.

Is my train of thought the right way to approach the problem using Oban? Or does anyone else have any suggestions, or solutions to my constratins/goals?

Is there a better way to think about this?

Thanks!

1 Like

Additional Context: Aggregate lifecycle only has a month of real time before it is retired, no more alert events.

Follow-on idea as a solution I will explore:

Two jobs, in two steps:

  1. Delayed Trigger Job: Enqueued Unique by state :scheduled and args :uuid (default unique fields), scheduled to run in 24 hours; no replacement.
  2. Work Job: Enqueued to run immediately, unique by state :all and args :uuid and period :infinity; no replacement.

on AlertAsserted, enqueue the Delayed Trigger Job.

on AlertRetracted, find and cancel an existing scheduled Delayed Trigger Job trigger.

My Event Handler will process the Event Stream sequentially in order.

We’ll just get a bunch of canceled Delayed Trigger Jobs possibly living in Oban jobs history if the Alerts flip flop a bunch; These can get GC’d after a time, doesn’t have to live in history indefinitely.

if the Delayed Trigger Job is run, then the Work Job is enqueued…

Because the Work Job’s unique state configuration, it will only ever be enqueued once and run once– it also can get GC’d after a two months (from time of completion) to give plenty of time to guarantee that the related aggregate’s life will have been retired.

Subsequent Delayed Trigger Jobs MAY be enqueued, but even if they run, the actual Work Job will dedup to the already run Work Job.

On my phone so this will be brief.

Have you thought about using a process manager to solve this problem? The problem you’re describing seems to have a bit of business logic that you can solve in the process manager.

1 Like

Have you explored that solution yet? Something that may simplify your original plan is to use Job Deadlines to have it cancel automatically after 24h.

Thanks @tcoopman and @sorentwo for the pointers.

What I ended up doing, given the “bit of business logic” realization from @tcoopman was to model the two concerns as two aggregates, a process manager, two event subscribers, two Oban job-workers.

My two aggregates will track each their own business logic/policy concern:

  • MyMetric aggregate with stream identity metric-{uuid} is just about determining when we’ve crossed alert threshold states.
    • Events: AlertAsserted, AlertRetracted
  • MyNotification aggregate with stream identity notification-{uuid} is just about the delayed publishing of notifications.
    • Events: NotificationScheduled, NotificationCanceled, and NotificationPublished
    • Commands: Schedule, Cancel, Publish

And the processes to solve are:

  • Connecting Metric alerts to Notifications (scheduling and cancelations)
    • via Process Manager
  • The real-clock timer delay, which is external to Commanded PM and Handler determinism.
    • via Event Handlers that produces Oban jobs (scheduled_at), to keep non-deterministic timing outside the commanded parts.

So the choreographed flow looks like this:

  1. MyProcessManager (Commanded Process Manager behavior) subscribes to AlertAsserted and AlertRetracted events, and simply issues the corresponding Schedule and Cancel notification commands
    1. Reuse the shared uuid between Metric and Notification aggregate to keep track of things, and here we only need a single MyProcessManager “process”.
    2. To solve the replay/crash issue – given that a PM command may be issued +1 times if commands succeed but PM crashes before the updated PM state is saved – a sequence_number is added to the Notification aggregate, command and events. This is set from the event metadata stream_version that the PM gets in handle/3 (state, event, metadata).
  2. MyNotification aggregate:
    1. State contains status, alert_number, last_sequence_number; given each notification event, we can properly set these values; we expect monotonic increasing last sequence numbers.
    2. on “Schedule” command, emit Scheduled event if in initial or canceled status
      1. the Scheduled event contains uuid, sequence_number and alert_number.
      2. The alert_number IS the sequence_number here – a key thing to distinguish which timer to watch for, and to separate alert-firing from the event/command relay tracking.
      3. this no-ops if the notification is still in scheduled or if notification has been published
    3. on “Cancel” command, emit Canceled event if the state is scheduled AND the command sequence number > last sequence number
      1. the Canceled event contains the uuid, sequence_number
      2. this no-ops for initial state, canceled and published status.
    4. on “Publish” command, emit Published event if the state is scheduled and the command’s alert_number matches the aggregate state.
      1. Just to tidy things up, I set a sequence_number in the event to some terminal “max int”… though, the schedule and cancel commands will still no-op because of the aggregate status “published”.
    5. on Scheduled events, MyScheduledNotificationsMonitor will insert an Oban job scheduled_at 24 hours in the future of the Scheduled event’s created_at, with additional {publish_time} argument also set the same as the above scheduled_at. This job is solely here to dispatch a Publish command with the given alert_number and publish_time – unique by {uuid, alert_number}, period :infinity
    6. on Published, MyPublishedNotificationsMonitor will insert the Oban job to do the actual delivery (side-effect work) of the notification, unique by {uuid}, period :infinity.
      1. Oban job has scheduled at publish_time and a deadline 24 hours after this scheduled time – so if we have a flurry of very late publishing job enqueues because of Oban workers not running for a period, the actual delivery work won’t need to run as they’re no longer consequential.

Timing:

  • Cancel before Publish command: The notification is canceled, and the subsequence publish command will no-op.
  • Publish before Cancel command: the notification is published, and the subsequence cancel command will no-op.

Consequences:

  • A series of Alert Assertions and Retractions will result in proper notification state, ready to fire or not… with a series of Oban jobs – one timer job per alert number due to uniqueness– that will execute, and all but the final “alert_number” job will succeed for a publish (or all is canceled, thus no publish).
  • The notification is published just once, and will only have at-most one Oban job… but also conditioned by the success of the delivery.

Thanks for the pointers to help me find this solution.

1 Like