sheharyarn

sheharyarn

Background Job Processing using GenServer / GenStage?

I have a Phoenix application that downloads (10MB+) files from another server on a specific user action, manipulates them, and then sends them to the user. I want to offload these jobs into a separate Background Job Queue, instead of blocking the current operation and just notify the user when the files are available to download.

defmodule Asset do
  # Ecto Model and other stuff...
  
  def download(asset) do
    # Long running HTTPoison task that downloads the file,
    # manipulates it, and stores to the disk so the user can
    # download it later
  end
end

Instead of going with an external job processing library or relying on any external programs such as Redis, I want to do this myself while staying in OTP land (mostly as a learning experience). I eventually plan on integrating (A)mnesia to persist job state, but for now I just want to handle them.

I’m hoping these tasks can be offloaded in a simple manner, something like this:

download_job = fn -> Asset.download(some_asset) end
on_complete  = fn -> User.notify(user, "SomeAsset is available now") end

# Add a new job to the worker along with an anonymous function
# that gets called when the job is completed
BackgroundWorker.add_job(download_job, on_completed)

I believe GenServer / GenStage is the way to go here, but I don’t have any experience with them to even get started. I would really appreciate some direction (code-wise) on how to implement a very basic Job Worker in Elixir (following proper patterns). I’m also reading up on other GenServer / GenStage examples, but I need some guidance so I can get started.

Would appreciate any pointers I could get. Thanks in advance! :smile:

Most Liked

sasajuric

sasajuric

Author of Elixir In Action

Given this, I’d recommend you start with a GenServer solution, because it could be a bit simpler, and it’s a good chance to practice some basic OTP a bit :slight_smile:

You could start very simple and implement a queue which runs one job at a time. For that, I’d likely use a scheduler GenServer and a Task.Supervisor. The scheduler process receives a request, and if no job is running it starts a task under the task supervisor. The scheduler also sets up a monitor to the started task, so it can know when the task finishes (or crashes). Once that happens, the scheduler can run the next job, if such is available, or otherwise just clear the monitor from its state, to indicate that no job is running. If a request arrives when something is running (which you can see by the fact that you have a non-nil monitor reference in your state), you just store the request into an internal state and wait until the job finishes.

I’d likely use the :queue module to store the collection of pending jobs. Of the entire API I think all you need is :queue.new, :queue.in, and :queue.out.

You probably want to make sure that both the scheduler and the task supervisor live and die as a unit. A crash of the scheduler should take down the supervisor, because otherwise a restarted scheduler could wrongly think that no job is running. Also, make sure that the scheduler starts after the supervisor, because otherwise it might receive a request before the supervisor is started, and then it won’t be able to start the corresponding task in the supervisor..

Once you have it working for one job at a time, it should be fairly straightforward to expand it for max N simultaneous jobs.

Finally, you could try to do the same thing with GenStage, and compare both solutions. A retrospective blog post would be a very interesting read :wink:

Best of luck!

sasajuric

sasajuric

Author of Elixir In Action

As I originally mentioned, the road I’d likely take is to use process monitor. The assumption here is that the job manager is not interested in the result of the task, but only in whether the task finished (either successfully, or through a crash). In such case, using Task.async (or async_nolink) makes less sense, since we don’t need that extra message from the task to its creator.

Therefore, I’d just do a {:ok, pid} = Task.start_link(...), followed by a mref = Process.monitor(pid), and finally I’d store mref somewhere in the state. Finally, in handle_info clause of a :DOWN message, I’d specifically check whether the mref included in the message corresponds to some of mrefs I keep in the state. If yes, I can conclude that the corresponding task has finished, clean that mref from the state, and start the next taxt from the queue (if such exists).

dom

dom

Try: Task.Supervisor — Elixir v1.20.2

And check what you receive in handle_info :slight_smile:

(Edit: I would also suggest moving the is_function check to the client code (add_job), rather than the server code. It makes more sense to crash the client than the server if a non-function is passed, since the bug is in the client.)

Where Next?

Popular in Questions Top

rms.mrcs
Hi, I need to transform a list of numbers into a map where the keys are the indexes and the values are the original values of the list. ...
New
nobody
How to bind a phoenix app to a specific ip address? could not find anything about that, nowhere, unfortunately, but for me this is quite...
New
myronmarston
The Elixir Typespec docs show the following syntax for keyword lists in typespecs: # ... | [key: type] # keyword lists...
New
johnnyicon
Hi all, I’ve just started learning Elixir and Phoenix Framework, so please pardon my n00bness at this stage. I’m trying to use Postgres...
New
baxterw3b
Hi guys, i’m new in the Elixir world, and i have to say, that i love it! i’m having some problem to understand anonymous functions with ...
New
lanycrost
Hi everyone! I need implement if…else if…else condition from my elixir code, and anymore of this control flow structures not work proper...
New
joeerl
Hello again - after a longish gap I’ve decided I really must dig into Elixir and see what’s been happening here - so I have a few questio...
New
JDanielMartinez
Hi! May someone helps me, please! I have two apps into an umbrella project: the first one is Database, which manages queries, and the se...
New
stefanluptak
Hello everybody, usually, I use a 29" ultra-wide monitor for VSCode which can easily accomodate explorer (files panel) + file with code ...
New
fayddelight
I tried installing elixir 1.11.2 erlang 23.3.4 via asdf in my zsh shell. Enabled the versions locally and globally. When I list them ...
New

Other popular topics Top

electic
Hi, I am new to Elixir. I am trying to use the DateTime component to insert a date into MySQL however the there seems to be no way to fo...
New
vegabook
I’m brand new to Phoenix and I have stripped one of the demo applications to the bone. I just want to get an svg up on the screen. Here i...
New
joaquinalcerro
Hi there, I am working with Ecto-Postgresql and I need to call all of the records from a specific table but the table has 40,000 records...
New
lanycrost
Hi everyone! I need implement if…else if…else condition from my elixir code, and anymore of this control flow structures not work proper...
New
dogweather
I wrote this comment on r/haskell, and it’s not popular there. :wink: But I think I’m on to something… Haskell reminds me of Java, and e...
New
AstonJ
We’ve put together this wiki for Phoenix LiveView - please feel free to add any info you feel is worth including. What is Phoenix LiveV...
New
sen
Hi All, I set a environment variables in dev.exs , like below code. when i start server, how can i set the ${enable} value? thanks. d...
New
gausby
I asked this very same question on twitter and got some interesting feedback, but I thought it would be a good question to ask here as we...
1207 39297 209
New
aalberti333
As the title describes, I’m trying to run Enum.map() over a list of key/value pairs, where the value is a map. My data looks like this: ...
New
WestKeys
Currently suffering from paralysis by [HTTP client] analysis. This is rather unusual in Elixirland as there tends to be consensus on the ...
New

We're in Beta

About us Mission Statement