Oban — Reliable and Observable Job Processing

Oban looks great! There are 2 features that I miss in Exq and I’m not sure if they can be achieved in Oban:

1.- Avoiding duplicate jobs. That is, being able to check if a job has already been enqueued with the same arguments so I can avoid enqueueing it twice.

2.- Tagging jobs so they can be found quickly. My main use would be tagging the related jobs to some entity - if that is destroyed then I want to kill all the pending jobs for it easily.

3 Likes

or “named” jobs eg “send_email_for_bill_id:777” which of course should balk at duplicates
(but haven’t thoroughly checked Oban out yet…)

That can be useful as well, but of course the API needs to be flexible - in many cases it’s totally fine to repeat past jobs with the same arguments, in other cases it may be fine to repeat a job that has already finished, in cases like sending emails for bills it’s probably not… In my case I only want to check that a job is not already waiting at the queue.

This can definitely be achieved in Oban. Unlike Exq (or most any of the other libraries) you have complete control over how your jobs are inserted into the database.

First, a quick definition of “unique jobs” because in my experience the meaning can be confusing. Usually the uniqueness only applies to jobs that are in the queue. That means you can prevent putting two jobs with the same arguments in the queue at the same time, but it doesn’t prevent multiple jobs with the same arguments within a window of time. You mentioned this difference below, but I wanted to call it out for anybody that may not be familiar with the feature.

Here are some ways I can think of to implement unique jobs with different types of guarantees.

1. Partial Index

Add an index for the worker that you want to enforce uniqueness for:

create index(
  :oban_jobs,
  [:worker, :args],
  unique: true,
  where: "worker = 'MyApp.Worker' and state in ('available', 'scheduled')"
)

Then, pass on_conflict: :nothing as options to Repo.insert/2 and it will ensure you don’t have duplicate jobs.

2. Insert Helper

Create a helper inside your Repo that will check for existing jobs before trying to insert:

def insert_unique(changeset, opts \\ []) do
  worker = get_change(changeset, :worker)

  case get_by(Oban.Job, worker: worker, state: "scheduled") do
    nil ->
      insert(changeset, opts)

    _job ->
      {:ignored, changeset}
  end
end

That function is a bit rough, but you get the idea.

Neither of these are as convenient as declaring it in the worker, i.e. use Oban.Worker, unique_for: :timer.minutes(1). I’ll think about this some more!

Honestly, I hadn’t considered first class tags before. This can be accomplished by providing additional fields to args.

Here we are adding an additional tag value to the args:

%{id: record.id, tag: "dependent"}
|> Oban.Job.new()
|> MyApp.Repo.insert()

Later, if the record is deleted we can also delete any dependent jobs within the same transaction:

dependents =
  Oban.Job
  |> where([j]. j.state in ["available", "scheduled"])
  |> where([j], fragment("?->>'tag' = 'dependent' and ?->>'id' = ?::text", j.args, ^record.id)

Ecto.Multi.new()
|> Ecto.Multi.delete(:delete, record)
|> Ecto.Multi.delete_all(:jobs, dependents)
|> MyApp.Repo.transaction()

I hope those ideas are somewhat helpful. Thanks for checking out Oban :slight_smile:

7 Likes

Oban v0.4.0 has been released. As with the previous release, there are a bunch of fixes and added features that came from this thread and issues on GitHub. Thanks again :purple_heart:

Note that there are some breaking changes included in this release, but they are all minor changes that only take a find-and-replace to fix:

Directly from the CHANGELOG:

Added

  • [Oban] Add Oban.drain_queue/1 to help with integration testing. Draining a queue synchronously executes all available jobs in the queue from within the calling process. This avoids any sandbox based database connection issues and prevents race conditions from asynchronous processing. Thanks to @josevalim

  • [Oban.Worker] Add backoff/1 callback and switch to exponential backoff with a base value as the default. This allows custom backoff timing for individual workers.

  • [Oban.Telemetry] Added a new module to wrap a default handler for structured JSON logging. The log handler is attached by calling Oban.Telemetry.attach_default_logger/0 somewhere in your application code. (Inspired by a similar feature in Redix)

  • [Oban.Queue.Producer] Guard against Postgrex errors in all producer queries using a circuit breaker. Failing queries will no longer crash the producer. Instead, the failure will be logged as an error and it will trip the producer’s circuit breaker. All subsequent queries will be skipped until the breaker is enabled again approximately a minute later.

    This feature simplifies the deployment process by allowing the application to boot and stay up while Oban migrations are made. After migrations have finished each queue producer will resume making queries. Thanks to @philss. Side note, the solution was inspired by a discussion between @keathley and @adkron on the most recent Elixir Outlaws.

Changed

  • [Oban] Telemetry events now report timing as %{duration: duration} instead of %{timing: timing}. This aligns with the telemetry standard of using duration for the time to execute something.

  • [Oban] Telemetry events are now broken into success and failure at the event level, rather than being labeled in the metadata. The full event names are now [:oban, :success] and [:oban, :failure].

  • [Oban.Job] Rename scheduled_in to schedule_in for readability and consistency. Both the Oban docs and README showed schedule_in, which reads more clearly than scheduled_in. Thanks @lukerollans

  • [Oban.Pruner] Pruning no longer happens immediately on startup and may be configured through the :prune_interval option. The default prune interval is still one minute. Again, thanks @philss

Fixed

  • [Oban.Migrations] Make partial migrations more resilient by guarding against missing versions and using idempotent statements.

v0.4.0 Docs

10 Likes

Thanks for putting this out there. It is nice to know that our talks have an impact outside those of us that sit on the show. I’m delighted that this talk is one that people are seizing. This idea came from this episode Elixir Outlaws Episode 42: Carriage Return Line Feed?

Do you have a link to the commits that actually implemented the change? I’d love to take a look.

6 Likes

I think I found the commit. https://github.com/sorentwo/oban/commit/876c4eaa621e6ca42acc05bc1489c82a7d61fcb3

1 Like

Absolutely, thanks for putting the show out there.

I had been weighing how to properly handle the original issue (application crashing while deploying migrations because the table wasn’t available). The discussion around what is expected failure and what can be considered a “responsive” system came at the perfect time.

That’s the one :+1:. The change itself was rather minor and focused on a specific set of expected failures—if the oban_jobs table doesn’t exist or has structural inconsistencies it shouldn’t crash the rest of the application.

2 Likes

That is an excellent and simple solution. I especially like that you logged the information to make sure that your system is observable. You also had a test which, of course, makes me smile.

2 Likes

I’m wondering about using oban to organize jobs that use ffmpeg.

It looks like the dampener feature you mentioned would work well with ffmpeg since it goes off mem/cpu instead of delegating to the beam - did I understand that correctly? If so, awesome =)

My main question is around > 1 apps accessing jobs… I’d like the main app to queue a job, but I want a separate app to run the job.

So in the main app:

Oban.Job.new(queue: :default, worker: :"SeparateApp.Worker") 

Where SeparateApp.Worker doesn’t actually exist in the main app… Then I actually define the SeparateApp.Worker in the separate app.

Is that a bad way of doing it? I was just thinking ffmpeg has different resource needs scales differently.

Alternatively, I could both queue the job and run the job in the separate app via api, and I believe still get job visibility (when your UI comes out) in the main app (providing it uses the same db). So it would be a separate elixir/phoenix app running in docker exposing its own api.

I had also considered just using aws here. So aws s3 adds a job to aws batch which calls a bash script in a docker container with ffmpeg. I’m guessing this would be more efficient computing wise, but it would lose the visibility and portability Oban provides.

Curious on any thoughts!

1 Like

That feature isn’t quite baked yet, it definitely won’t be ready for a while. The version I had been envisioning would track reductions/memory usage from executing processes to automatically scale queues up and down based on workload. Tying that to OS processes would be significantly more difficult, especially for something like FFMpeg which is multi-threaded itself and could use all available cores even from a single job.

You can achieve this with Oban quite easily. Remember, only the exact queues passed to the Oban supervisor will be started and there isn’t any global configuration. You can use SeparateApp.Worker.new to enqueue the job from your main app and so long as the main app isn’t running that worker’s queue it will be ignored.

Here’s a more concrete example. First, define the worker with a queue other than :default:

defmodule SeparateApp.Worker do
  use Oban.Worker, queue: :video_processing

  @impl true
  def perform(args) do
    # process with ffmpeg
  end
end

Then, configure your apps to start only the queues they should be processing:

# In the supervisor of your primary app
{Oban, repo: Repo, queues: [default: 20]}

# In the supervisor of your secondary app
{Oban, repo: Repo, queues: [video_processing: 5]}

Within your primary application code you can conveniently call SeparateApp.Worker.new(%{}). You are welcome to use the Oban.Job.new/2 variant you listed above, but you don’t need to.

IMO that is a great way of doing it. We do all of our media processing in a separate container for this exact reason. Resource usage is unpredictable and the BEAM can get so starved for CPU that it can’t cope the way it usually does.

Right, you would lose all of the execution guarantees and retry behavior. You can definitely make this work within a single umbrella/poncho, so long as you start apps in different containers.

2 Likes

Thanks for the great project @sorentwo ! I’m working on using it to handle a long running port process in a separate app. However, I have a requirement to report back percentage progress while it is running, so I’m using a GenServer to manage feedback from the port. What’s a good way to prevent the job from being marked complete until my GenServer has finished its work?

2 Likes

Anything that blocks the perform/1 function will prevent the job from being marked complete. There isn’t a limit on how long a job can execute (since it isn’t executed within a transaction). Without knowing more about your use case or seeing the code I can only provide some rough suggestions.

  • Make a call to your GenServer process (we need a call so that we get the from tuple).
  • Within your handle_call function grab the from for the caller (which will be your job process) and start a Task to track progress.
  • Respond from handle_call immediately without something like {:reply, :ok, newstate}, don’t await the task.
  • Using a loop within the task start sending progress messages to the caller. You can loop with a delay by using Process.sleep/1. The original caller will need to need use a receive loop to block and wait for progress messages.
  • When the task is complete send the caller a message to let it know work is finished.
  • Be sure to set an after timeout in your receive loop to prevent blocking forever and leaving a zombie job.

The worker portion would look roughly like this (warning, untested code):

defmodule MyApp.WaitingWorker do
  use Oban.Worker

  @impl true
  def perform(%{id: id}) do
    :ok = MyApp.Business.work(id)

    receive_loop(id)
  end

  defp receive_loop(id) do
    receive do
      {:progress, percent} ->
        report_progress(id, percent)
        receive_loop(id)

      :complete ->
        :ok

    after
      60_000 ->
        raise RuntimeError, "no progress for #{id} after 60s"
    end
  end
end

Hope that helps and doesn’t just cause more confusion :smile:

5 Likes

Thanks @sorentwo, that was very helpful! I ended up doing it a little different on the GenServer side, but passing the caller pid and using the receive loop worked great.

1 Like

Oban v0.5.0 has been released. This includes some minor features, a number of bug fixes and some documentation improvements.

There are some changes that may cause unexpected retries and different telemetry payload. See the CHANGELOG excerpt and the linked docs below for more details.

Added

  • [Oban.Pruner] Added prune_limit option to constrain the number of rows deleted on each prune iteration. This prevents locking the database when there are a large number of jobs to delete.

Changed

  • [Oban.Worker] Treat {:error, reason} tuples returned from perform/1 as a failure. The :kind value reported in telemetry events is now differentiated, where a rescued exception has the kind :exception, and an error tuple has the kind :error. Thanks @lucasmazza

Fixed

  • [Oban.Testing] Only check available and scheduled jobs with the assert|refute_enqueued testing helpers. Thanks @jc00ke

  • [Oban.Queue.Watchman] Catch process exits when attempting graceful shutdown. Any exits triggered within terminate/2 are caught and ignored. This safely handles situations where the producer exits before the watchman does. Thanks @axelson

  • [Oban.Queue.Producer] Use circuit breaker protection for gossip events and call queries directly from the producer process. This prevents pushing all queries through the Notifier, and ensures more granular control over gossip errors. Thanks @axelson

v0.5.0 Docs
Telemetry Changes
Error Tuple Changes

7 Likes

Looks like a great update, keep up the good work @sorentwo !

I have another question. I need to alert the end user if the entire job fails after all retries. Do you have a recommendation? I see the :telemetry.attach("oban-errors", [:oban, :failure], &ErrorReporter.handle_event/4, nil) example, which could be a possibility, but thought I’d check with you.

1 Like

That is exactly what I would recommend doing. The meta map passed to your handler will have both max_attempts and the attempt field. You can easily check if retries have been exhausted and send your alert to the user.

The table at the top of Oban.Telemetry docs has a breakdown of all the fields passed to each event. (Though now that I liked to it I see a typo, it is max_attempts plural, not max_attempt)

3 Likes

Ok, sounds good. Thanks again for the help @sorentwo !

1 Like

Thanks for the great work @sorentwo, Oban looks amazing. I’m currently playing with it in a project and everything is running smoothly :smile::green_heart:

I have a question about pruning, if you don’t mind. In the Readme, the first example suggests prune: {:maxlen, 100_000}. Looking at the code, it seems prune is used to delete truncated jobs and outdated jobs, but I’m not sure what those are. Can you explain how a job gets truncated or outdated?

Thanks!

1 Like

Glad to hear Oban is working well for you!

Any job that won’t be executed (either now or in the future) can be pruned. This may be explained better with some examples:

Never Pruned

  • a new job, which has the state available
  • a scheduled job, which has the state available and is scheduled to execute some time in the future
  • a job that failed execution and will be retried in the future

Possibly Pruned

  • any job that ran successfully, it is in the completed state
  • any job that has reached the maximum number of retries or has been manually killed, these are in the discarded state

It is designed so that you’ll never lose jobs that still have work to do, but you can prevent the table from growing indefinitely.

3 Likes