Oban — Reliable and Observable Job Processing

I think I found the commit. https://github.com/sorentwo/oban/commit/876c4eaa621e6ca42acc05bc1489c82a7d61fcb3

1 Like

Absolutely, thanks for putting the show out there.

I had been weighing how to properly handle the original issue (application crashing while deploying migrations because the table wasn’t available). The discussion around what is expected failure and what can be considered a “responsive” system came at the perfect time.

That’s the one :+1:. The change itself was rather minor and focused on a specific set of expected failures—if the oban_jobs table doesn’t exist or has structural inconsistencies it shouldn’t crash the rest of the application.

2 Likes

That is an excellent and simple solution. I especially like that you logged the information to make sure that your system is observable. You also had a test which, of course, makes me smile.

2 Likes

I’m wondering about using oban to organize jobs that use ffmpeg.

It looks like the dampener feature you mentioned would work well with ffmpeg since it goes off mem/cpu instead of delegating to the beam - did I understand that correctly? If so, awesome =)

My main question is around > 1 apps accessing jobs… I’d like the main app to queue a job, but I want a separate app to run the job.

So in the main app:

Oban.Job.new(queue: :default, worker: :"SeparateApp.Worker") 

Where SeparateApp.Worker doesn’t actually exist in the main app… Then I actually define the SeparateApp.Worker in the separate app.

Is that a bad way of doing it? I was just thinking ffmpeg has different resource needs scales differently.

Alternatively, I could both queue the job and run the job in the separate app via api, and I believe still get job visibility (when your UI comes out) in the main app (providing it uses the same db). So it would be a separate elixir/phoenix app running in docker exposing its own api.

I had also considered just using aws here. So aws s3 adds a job to aws batch which calls a bash script in a docker container with ffmpeg. I’m guessing this would be more efficient computing wise, but it would lose the visibility and portability Oban provides.

Curious on any thoughts!

1 Like

That feature isn’t quite baked yet, it definitely won’t be ready for a while. The version I had been envisioning would track reductions/memory usage from executing processes to automatically scale queues up and down based on workload. Tying that to OS processes would be significantly more difficult, especially for something like FFMpeg which is multi-threaded itself and could use all available cores even from a single job.

You can achieve this with Oban quite easily. Remember, only the exact queues passed to the Oban supervisor will be started and there isn’t any global configuration. You can use SeparateApp.Worker.new to enqueue the job from your main app and so long as the main app isn’t running that worker’s queue it will be ignored.

Here’s a more concrete example. First, define the worker with a queue other than :default:

defmodule SeparateApp.Worker do
  use Oban.Worker, queue: :video_processing

  @impl true
  def perform(args) do
    # process with ffmpeg
  end
end

Then, configure your apps to start only the queues they should be processing:

# In the supervisor of your primary app
{Oban, repo: Repo, queues: [default: 20]}

# In the supervisor of your secondary app
{Oban, repo: Repo, queues: [video_processing: 5]}

Within your primary application code you can conveniently call SeparateApp.Worker.new(%{}). You are welcome to use the Oban.Job.new/2 variant you listed above, but you don’t need to.

IMO that is a great way of doing it. We do all of our media processing in a separate container for this exact reason. Resource usage is unpredictable and the BEAM can get so starved for CPU that it can’t cope the way it usually does.

Right, you would lose all of the execution guarantees and retry behavior. You can definitely make this work within a single umbrella/poncho, so long as you start apps in different containers.

2 Likes

Thanks for the great project @sorentwo ! I’m working on using it to handle a long running port process in a separate app. However, I have a requirement to report back percentage progress while it is running, so I’m using a GenServer to manage feedback from the port. What’s a good way to prevent the job from being marked complete until my GenServer has finished its work?

2 Likes

Anything that blocks the perform/1 function will prevent the job from being marked complete. There isn’t a limit on how long a job can execute (since it isn’t executed within a transaction). Without knowing more about your use case or seeing the code I can only provide some rough suggestions.

  • Make a call to your GenServer process (we need a call so that we get the from tuple).
  • Within your handle_call function grab the from for the caller (which will be your job process) and start a Task to track progress.
  • Respond from handle_call immediately without something like {:reply, :ok, newstate}, don’t await the task.
  • Using a loop within the task start sending progress messages to the caller. You can loop with a delay by using Process.sleep/1. The original caller will need to need use a receive loop to block and wait for progress messages.
  • When the task is complete send the caller a message to let it know work is finished.
  • Be sure to set an after timeout in your receive loop to prevent blocking forever and leaving a zombie job.

The worker portion would look roughly like this (warning, untested code):

defmodule MyApp.WaitingWorker do
  use Oban.Worker

  @impl true
  def perform(%{id: id}) do
    :ok = MyApp.Business.work(id)

    receive_loop(id)
  end

  defp receive_loop(id) do
    receive do
      {:progress, percent} ->
        report_progress(id, percent)
        receive_loop(id)

      :complete ->
        :ok

    after
      60_000 ->
        raise RuntimeError, "no progress for #{id} after 60s"
    end
  end
end

Hope that helps and doesn’t just cause more confusion :smile:

4 Likes

Thanks @sorentwo, that was very helpful! I ended up doing it a little different on the GenServer side, but passing the caller pid and using the receive loop worked great.

1 Like

Oban v0.5.0 has been released. This includes some minor features, a number of bug fixes and some documentation improvements.

There are some changes that may cause unexpected retries and different telemetry payload. See the CHANGELOG excerpt and the linked docs below for more details.

Added

  • [Oban.Pruner] Added prune_limit option to constrain the number of rows deleted on each prune iteration. This prevents locking the database when there are a large number of jobs to delete.

Changed

  • [Oban.Worker] Treat {:error, reason} tuples returned from perform/1 as a failure. The :kind value reported in telemetry events is now differentiated, where a rescued exception has the kind :exception, and an error tuple has the kind :error. Thanks @lucasmazza

Fixed

  • [Oban.Testing] Only check available and scheduled jobs with the assert|refute_enqueued testing helpers. Thanks @jc00ke

  • [Oban.Queue.Watchman] Catch process exits when attempting graceful shutdown. Any exits triggered within terminate/2 are caught and ignored. This safely handles situations where the producer exits before the watchman does. Thanks @axelson

  • [Oban.Queue.Producer] Use circuit breaker protection for gossip events and call queries directly from the producer process. This prevents pushing all queries through the Notifier, and ensures more granular control over gossip errors. Thanks @axelson

v0.5.0 Docs
Telemetry Changes
Error Tuple Changes

7 Likes

Looks like a great update, keep up the good work @sorentwo !

I have another question. I need to alert the end user if the entire job fails after all retries. Do you have a recommendation? I see the :telemetry.attach("oban-errors", [:oban, :failure], &ErrorReporter.handle_event/4, nil) example, which could be a possibility, but thought I’d check with you.

1 Like

That is exactly what I would recommend doing. The meta map passed to your handler will have both max_attempts and the attempt field. You can easily check if retries have been exhausted and send your alert to the user.

The table at the top of Oban.Telemetry docs has a breakdown of all the fields passed to each event. (Though now that I liked to it I see a typo, it is max_attempts plural, not max_attempt)

3 Likes

Ok, sounds good. Thanks again for the help @sorentwo !

1 Like

Thanks for the great work @sorentwo, Oban looks amazing. I’m currently playing with it in a project and everything is running smoothly :smile::green_heart:

I have a question about pruning, if you don’t mind. In the Readme, the first example suggests prune: {:maxlen, 100_000}. Looking at the code, it seems prune is used to delete truncated jobs and outdated jobs, but I’m not sure what those are. Can you explain how a job gets truncated or outdated?

Thanks!

1 Like

Glad to hear Oban is working well for you!

Any job that won’t be executed (either now or in the future) can be pruned. This may be explained better with some examples:

Never Pruned

  • a new job, which has the state available
  • a scheduled job, which has the state available and is scheduled to execute some time in the future
  • a job that failed execution and will be retried in the future

Possibly Pruned

  • any job that ran successfully, it is in the completed state
  • any job that has reached the maximum number of retries or has been manually killed, these are in the discarded state

It is designed so that you’ll never lose jobs that still have work to do, but you can prevent the table from growing indefinitely.

3 Likes

That makes a lot of sense. Thanks for the detailed answer!

I’ve posted a “recipe” on enforcing unique jobs with Oban. It is the first in a series of recipe posts, and was largely drawn from the discussion in this thread. (This is also linked in the monster “Blog Posts” thread)

Oban Recipes Part 1: Unique Jobs

8 Likes

Thanks @sorentwo, been using Oban for a few weeks now, very happy with it. In the initial post you teased a UI for visualizing the workers and job queues, is there a timeline or roadmap with respect to that UI?

7 Likes

@sorentwo,

Would you be interested in vue implementation of the ui with some filtering using absinthe?

Personally, no, I’m trying to avoid heavy JS where possible.

The current live view powered UI works wonderfully and I’m really pleased to be writing Elixir for everything. The remaining blockers are:

  1. Some ugly bugs (literally)
  2. A few missing core features
  3. Install and usage documentation
  4. The dependency on phoenix_live_view means I can’t release it on Hex
8 Likes

So you need someone to write docs and create a tutorial on how to use your package?