sheharyarn
Background Job Processing using GenServer / GenStage?
I have a Phoenix application that downloads (10MB+) files from another server on a specific user action, manipulates them, and then sends them to the user. I want to offload these jobs into a separate Background Job Queue, instead of blocking the current operation and just notify the user when the files are available to download.
defmodule Asset do
# Ecto Model and other stuff...
def download(asset) do
# Long running HTTPoison task that downloads the file,
# manipulates it, and stores to the disk so the user can
# download it later
end
end
Instead of going with an external job processing library or relying on any external programs such as Redis, I want to do this myself while staying in OTP land (mostly as a learning experience). I eventually plan on integrating (A)mnesia to persist job state, but for now I just want to handle them.
I’m hoping these tasks can be offloaded in a simple manner, something like this:
download_job = fn -> Asset.download(some_asset) end
on_complete = fn -> User.notify(user, "SomeAsset is available now") end
# Add a new job to the worker along with an anonymous function
# that gets called when the job is completed
BackgroundWorker.add_job(download_job, on_completed)
I believe GenServer / GenStage is the way to go here, but I don’t have any experience with them to even get started. I would really appreciate some direction (code-wise) on how to implement a very basic Job Worker in Elixir (following proper patterns). I’m also reading up on other GenServer / GenStage examples, but I need some guidance so I can get started.
Would appreciate any pointers I could get. Thanks in advance! ![]()
Most Liked
sasajuric
Given this, I’d recommend you start with a GenServer solution, because it could be a bit simpler, and it’s a good chance to practice some basic OTP a bit ![]()
You could start very simple and implement a queue which runs one job at a time. For that, I’d likely use a scheduler GenServer and a Task.Supervisor. The scheduler process receives a request, and if no job is running it starts a task under the task supervisor. The scheduler also sets up a monitor to the started task, so it can know when the task finishes (or crashes). Once that happens, the scheduler can run the next job, if such is available, or otherwise just clear the monitor from its state, to indicate that no job is running. If a request arrives when something is running (which you can see by the fact that you have a non-nil monitor reference in your state), you just store the request into an internal state and wait until the job finishes.
I’d likely use the :queue module to store the collection of pending jobs. Of the entire API I think all you need is :queue.new, :queue.in, and :queue.out.
You probably want to make sure that both the scheduler and the task supervisor live and die as a unit. A crash of the scheduler should take down the supervisor, because otherwise a restarted scheduler could wrongly think that no job is running. Also, make sure that the scheduler starts after the supervisor, because otherwise it might receive a request before the supervisor is started, and then it won’t be able to start the corresponding task in the supervisor..
Once you have it working for one job at a time, it should be fairly straightforward to expand it for max N simultaneous jobs.
Finally, you could try to do the same thing with GenStage, and compare both solutions. A retrospective blog post would be a very interesting read ![]()
Best of luck!
sasajuric
As I originally mentioned, the road I’d likely take is to use process monitor. The assumption here is that the job manager is not interested in the result of the task, but only in whether the task finished (either successfully, or through a crash). In such case, using Task.async (or async_nolink) makes less sense, since we don’t need that extra message from the task to its creator.
Therefore, I’d just do a {:ok, pid} = Task.start_link(...), followed by a mref = Process.monitor(pid), and finally I’d store mref somewhere in the state. Finally, in handle_info clause of a :DOWN message, I’d specifically check whether the mref included in the message corresponds to some of mrefs I keep in the state. If yes, I can conclude that the corresponding task has finished, clean that mref from the state, and start the next taxt from the queue (if such exists).
dom
Try: Task.Supervisor — Elixir v1.20.2
And check what you receive in handle_info ![]()
(Edit: I would also suggest moving the is_function check to the client code (add_job), rather than the server code. It makes more sense to crash the client than the server if a non-function is passed, since the bug is in the client.)
Popular in Questions
Other popular topics
Categories:
Sub Categories:
Forums
Popular Tags
- #ecto
- #liveview
- #troubleshooting
- #learning-elixir
- #deployment
- #library
- #erlang
- #testing
- #genserver
- #mix
- #absinthe
- #remote-other
- #otp
- #plug
- #how-to-question
- #macros
- #postgres
- #channels
- #elixirconf
- #exunit
- #discussion
- #javascript
- #code-sync
- #podcasts
- #onsite
- #dialyzer
- #docker
- #authentication
- #umbrella
- #full-time-contract
- #podcasts-by-brainlid
- #ecto-query
- #elixir-ls
- #phoenix_html
- #iex
- #blog-post
- #graphql
- #genstage
- #ai
- #websockets
- #supervisor
- #advent-of-code
- #elixirconf-us
- #distillery
- #processes
- #forms
- #api
- #metaprogramming
- #security
- #performance








