Can Oban process jobs in the order they are enqueued?

We have events (structs) which we wish to use to update a database table (read model) asynchronously.

The event need to be processed in the order they are published.

Currently we are using Oban, the event is received and a job created on a queue which has a concurrency of 1.

This ensures only one job is picked up at once, thus the jobs are processed in the order they are enqueued.

The one problem we have found is if a job raises and needs to retried then Oban will pull another job off the queue, thus breaking the order. So if the database was to go down or we introduced a bug then we effectively get the events processed in a random order due to the retries.

Is there anything in Oban (including the Pro version) that could help with this?

If not I will explore some other options, to keep Oban I might look at introducing a distributed lock, e.g. using CacheEx, or a persistent queue which runs inside the BEAM, if such a thing exists.

I’d be interested to know if anyone has tackled something similar.

Many thanks!

I don’t think Oban is a good fit for your use-case. I would just go with a custom solution that involves tasks + a custom task supervisor. The supervisor handling the order of those tasks, their re-run if crashed and persistence if you need it.

This is precisely what chained jobs are designed for!

Chains link jobs together to ensure they run in a strict sequential order. Downstream jobs in the chain won’t execute until the upstream job is completed, cancelled, or discarded. Behaviour in the event of cancellation or discards is customizable to allow for uninterrupted processing or holding for outside intervention.

Chains can also be partitioned, so jobs from the same worker, or with the same particular args are chained. That allows you to run a queue with higher concurrency while maintaining strict sequential order.

4 Likes

Excellent, the partitioning also solves another fundamental problem for us!

This is a great excuse for me to ask for us to purchase a Pro license to support the project. Oban is a really great bit of kit, thanks.

At the moment we have a single Worker which processes all events for our async event handlers which write to the database tables, as such:

defmodule EventHandler.Async do
  defmacro __using__(opts \\ []) do
    quote bind_quoted: [opts: opts] do
     
    @opts opts
    use Oban.Worker, Keyword.merge([queue: :default, tags: [to_string(__MODULE__)]], @opts)

    def on_event(event) do
      event
      |> serialize()
      |> new()
      |> Oban.insert()

       :ok
    end

    @impl Oban.Worker
    def perform(%Oban.Job{args: event_attributes}) do
      event_attributes
      |> deserialize() 
      |> call()
       
      :ok
    end
  end
end

defmodule MyDenormalizer do
  use EventHandler.Async, queue: "my_denormalizer"

  def call(event) do
    # write to database
  end
end

Since we use a single worker, EventHandler.Async, for everything, I think what we might need to do is pass some additional arg’s in to the job, the event handler name (i.e. MyDenormalizer), which we would then chain on since we want the events to be executed in order per denormalizer.

use Oban.Pro.Worker, queue: :my_denormalizer, chain: [by: [args: :denormalizer_name]

And then to get better performance since per denormalizer we can partition on the event entity id (which relates to a single row in the database table):

use Oban.Pro.Worker, queue: :my_denormalizer, chain: [by: [args: [:denormalizer_name, :entity_id]]

Is it possible to get a trial of the Pro version for us to work out a proof of concept? No worries, if not.

1 Like

That’s great to hear!

You’d definitely want to pass some additional args in to partition by. That could be a good place to make use of enum with structured jobs, to ensure it’s partitioning by a known value.

We don’t offer free trials due to abuse in the past. However, there is a 30 day refund if you work out a proof of concept and it doesn’t pan out.

5 Likes

I’ve submitted a proposal, likely we will get approval for Oban Pro since the cost of building and maintaining an (inferior) feature set ourselves significantly outweighs the license cost, plus we are supporting an open source project we already use.

I suspect we’ll go for a single month first and try a PoC before committing to a year. Cheers, Kris.

4 Likes

Could not agree more.

1 Like

Just throwing it out there but I handle basically this scenario with something similar to chained jobs except I use a self-inserting job to control the concurrency.

So I have some queue of however big you want it to be, and I start a job for a worker with arg partition_id: 123. The job is defined unique on the partition ID just to be safe. The job processes some work, and then it inserts itself with some indicator that it should try the next batch.

For example if you have a database with a bunch of things to process, the first job can start at row ID 0, the next at row ID 10, etc.

This approach has worked well for me so far, but if you need to store the actual data in the job args it may not work as well as it relies on each instance of the job being able to figure out what data to work on while storing it externally.

The advantages are as you say, it covers only one job running for this arbitrary partition key at a time, as the next job for that partition won’t even run unless the parent (one that inserts it) actually finishes processing. I am not so sure but there’s probably something you can hook into to figure out if a job hits max retry that you should queue up the next one for that partition, but have not done this yet.

1 Like