I think I found the commit. https://github.com/sorentwo/oban/commit/876c4eaa621e6ca42acc05bc1489c82a7d61fcb3
Absolutely, thanks for putting the show out there.
I had been weighing how to properly handle the original issue (application crashing while deploying migrations because the table wasn’t available). The discussion around what is expected failure and what can be considered a “responsive” system came at the perfect time.
That’s the one . The change itself was rather minor and focused on a specific set of expected failures—if the
oban_jobs
table doesn’t exist or has structural inconsistencies it shouldn’t crash the rest of the application.
That is an excellent and simple solution. I especially like that you logged the information to make sure that your system is observable. You also had a test which, of course, makes me smile.
I’m wondering about using oban to organize jobs that use ffmpeg.
It looks like the dampener feature you mentioned would work well with ffmpeg since it goes off mem/cpu instead of delegating to the beam - did I understand that correctly? If so, awesome =)
My main question is around > 1 apps accessing jobs… I’d like the main app to queue a job, but I want a separate app to run the job.
So in the main app:
Oban.Job.new(queue: :default, worker: :"SeparateApp.Worker")
Where SeparateApp.Worker doesn’t actually exist in the main app… Then I actually define the SeparateApp.Worker in the separate app.
Is that a bad way of doing it? I was just thinking ffmpeg has different resource needs scales differently.
Alternatively, I could both queue the job and run the job in the separate app via api, and I believe still get job visibility (when your UI comes out) in the main app (providing it uses the same db). So it would be a separate elixir/phoenix app running in docker exposing its own api.
I had also considered just using aws here. So aws s3 adds a job to aws batch which calls a bash script in a docker container with ffmpeg. I’m guessing this would be more efficient computing wise, but it would lose the visibility and portability Oban provides.
Curious on any thoughts!
That feature isn’t quite baked yet, it definitely won’t be ready for a while. The version I had been envisioning would track reductions/memory usage from executing processes to automatically scale queues up and down based on workload. Tying that to OS processes would be significantly more difficult, especially for something like FFMpeg which is multi-threaded itself and could use all available cores even from a single job.
You can achieve this with Oban quite easily. Remember, only the exact queues passed to the Oban supervisor will be started and there isn’t any global configuration. You can use SeparateApp.Worker.new
to enqueue the job from your main app and so long as the main app isn’t running that worker’s queue it will be ignored.
Here’s a more concrete example. First, define the worker with a queue other than :default
:
defmodule SeparateApp.Worker do
use Oban.Worker, queue: :video_processing
@impl true
def perform(args) do
# process with ffmpeg
end
end
Then, configure your apps to start only the queues they should be processing:
# In the supervisor of your primary app
{Oban, repo: Repo, queues: [default: 20]}
# In the supervisor of your secondary app
{Oban, repo: Repo, queues: [video_processing: 5]}
Within your primary application code you can conveniently call SeparateApp.Worker.new(%{})
. You are welcome to use the Oban.Job.new/2
variant you listed above, but you don’t need to.
IMO that is a great way of doing it. We do all of our media processing in a separate container for this exact reason. Resource usage is unpredictable and the BEAM can get so starved for CPU that it can’t cope the way it usually does.
Right, you would lose all of the execution guarantees and retry behavior. You can definitely make this work within a single umbrella/poncho, so long as you start apps in different containers.
Thanks for the great project @sorentwo ! I’m working on using it to handle a long running port process in a separate app. However, I have a requirement to report back percentage progress while it is running, so I’m using a GenServer to manage feedback from the port. What’s a good way to prevent the job from being marked complete until my GenServer has finished its work?
Anything that blocks the perform/1
function will prevent the job from being marked complete. There isn’t a limit on how long a job can execute (since it isn’t executed within a transaction). Without knowing more about your use case or seeing the code I can only provide some rough suggestions.
- Make a
call
to your GenServer process (we need a call so that we get thefrom
tuple). - Within your
handle_call
function grab thefrom
for the caller (which will be your job process) and start a Task to track progress. - Respond from
handle_call
immediately without something like{:reply, :ok, newstate}
, don’tawait
the task. - Using a loop within the task start sending progress messages to the caller. You can loop with a delay by using
Process.sleep/1
. The original caller will need to need use areceive
loop to block and wait for progress messages. - When the task is complete send the caller a message to let it know work is finished.
- Be sure to set an
after
timeout in your receive loop to prevent blocking forever and leaving a zombie job.
The worker portion would look roughly like this (warning, untested code):
defmodule MyApp.WaitingWorker do
use Oban.Worker
@impl true
def perform(%{id: id}) do
:ok = MyApp.Business.work(id)
receive_loop(id)
end
defp receive_loop(id) do
receive do
{:progress, percent} ->
report_progress(id, percent)
receive_loop(id)
:complete ->
:ok
after
60_000 ->
raise RuntimeError, "no progress for #{id} after 60s"
end
end
end
Hope that helps and doesn’t just cause more confusion
Thanks @sorentwo, that was very helpful! I ended up doing it a little different on the GenServer side, but passing the caller pid and using the receive loop worked great.
Oban v0.5.0 has been released. This includes some minor features, a number of bug fixes and some documentation improvements.
There are some changes that may cause unexpected retries and different telemetry payload. See the CHANGELOG excerpt and the linked docs below for more details.
Added
- [Oban.Pruner] Added
prune_limit
option to constrain the number of rows deleted on each prune iteration. This prevents locking the database when there are a large number of jobs to delete.
Changed
- [Oban.Worker] Treat
{:error, reason}
tuples returned fromperform/1
as a failure. The:kind
value reported in telemetry events is now differentiated, where a rescued exception has the kind:exception
, and an error tuple has the kind:error
. Thanks @lucasmazza
Fixed
-
[Oban.Testing] Only check
available
andscheduled
jobs with theassert|refute_enqueued
testing helpers. Thanks @jc00ke -
[Oban.Queue.Watchman] Catch process exits when attempting graceful shutdown. Any exits triggered within
terminate/2
are caught and ignored. This safely handles situations where the producer exits before the watchman does. Thanks @axelson -
[Oban.Queue.Producer] Use circuit breaker protection for gossip events and call queries directly from the producer process. This prevents pushing all queries through the
Notifier
, and ensures more granular control over gossip errors. Thanks @axelson
Looks like a great update, keep up the good work @sorentwo !
I have another question. I need to alert the end user if the entire job fails after all retries. Do you have a recommendation? I see the :telemetry.attach("oban-errors", [:oban, :failure], &ErrorReporter.handle_event/4, nil)
example, which could be a possibility, but thought I’d check with you.
That is exactly what I would recommend doing. The meta
map passed to your handler will have both max_attempts
and the attempt
field. You can easily check if retries have been exhausted and send your alert to the user.
The table at the top of Oban.Telemetry docs has a breakdown of all the fields passed to each event. (Though now that I liked to it I see a typo, it is max_attempts
plural, not max_attempt
)
Thanks for the great work @sorentwo, Oban looks amazing. I’m currently playing with it in a project and everything is running smoothly
I have a question about pruning, if you don’t mind. In the Readme, the first example suggests prune: {:maxlen, 100_000}
. Looking at the code, it seems prune is used to delete truncated jobs and outdated jobs, but I’m not sure what those are. Can you explain how a job gets truncated or outdated?
Thanks!
Glad to hear Oban is working well for you!
Any job that won’t be executed (either now or in the future) can be pruned. This may be explained better with some examples:
Never Pruned
- a new job, which has the state
available
- a scheduled job, which has the state
available
and is scheduled to execute some time in the future - a job that failed execution and will be retried in the future
Possibly Pruned
- any job that ran successfully, it is in the
completed
state - any job that has reached the maximum number of retries or has been manually killed, these are in the
discarded
state
It is designed so that you’ll never lose jobs that still have work to do, but you can prevent the table from growing indefinitely.
That makes a lot of sense. Thanks for the detailed answer!
I’ve posted a “recipe” on enforcing unique jobs with Oban. It is the first in a series of recipe posts, and was largely drawn from the discussion in this thread. (This is also linked in the monster “Blog Posts” thread)
Thanks @sorentwo, been using Oban for a few weeks now, very happy with it. In the initial post you teased a UI for visualizing the workers and job queues, is there a timeline or roadmap with respect to that UI?
Would you be interested in vue implementation of the ui with some filtering using absinthe?
Personally, no, I’m trying to avoid heavy JS where possible.
The current live view powered UI works wonderfully and I’m really pleased to be writing Elixir for everything. The remaining blockers are:
- Some ugly bugs (literally)
- A few missing core features
- Install and usage documentation
- The dependency on phoenix_live_view means I can’t release it on Hex
So you need someone to write docs and create a tutorial on how to use your package?