Background Job Processing using GenServer / GenStage?

I have a Phoenix application that downloads (10MB+) files from another server on a specific user action, manipulates them, and then sends them to the user. I want to offload these jobs into a separate Background Job Queue, instead of blocking the current operation and just notify the user when the files are available to download.

defmodule Asset do
  # Ecto Model and other stuff...
  
  def download(asset) do
    # Long running HTTPoison task that downloads the file,
    # manipulates it, and stores to the disk so the user can
    # download it later
  end
end

Instead of going with an external job processing library or relying on any external programs such as Redis, I want to do this myself while staying in OTP land (mostly as a learning experience). I eventually plan on integrating (A)mnesia to persist job state, but for now I just want to handle them.

I’m hoping these tasks can be offloaded in a simple manner, something like this:

download_job = fn -> Asset.download(some_asset) end
on_complete  = fn -> User.notify(user, "SomeAsset is available now") end

# Add a new job to the worker along with an anonymous function
# that gets called when the job is completed
BackgroundWorker.add_job(download_job, on_completed)

I believe GenServer / GenStage is the way to go here, but I don’t have any experience with them to even get started. I would really appreciate some direction (code-wise) on how to implement a very basic Job Worker in Elixir (following proper patterns). I’m also reading up on other GenServer / GenStage examples, but I need some guidance so I can get started.

Would appreciate any pointers I could get. Thanks in advance! :smile:

1 Like

In a case like this, where all background processes are independent from another, you can make a very clear solution by using Tasks. documentation. In this case, use Task.start as you are do not need to wait for the return value.

Having a single GenServer handle a whole list of jobs is useful only if you want to restrict the amount of concurrent jobs (i. e. downloads in this example) that you want to happen.

@Qqwy Yes, that’s one of the main reasons I feel GenServer/Stage is the proper choice. I don’t want more than 2, 3 concurrent download operations. Another reason is setting a timeout on the download operation.

A GenServer-implementing approach would be something like the following:

defmodule BackgroundWorker do
  use GenServer

  def start_link() do
    {:ok, pid} = GenServer.start_link(__MODULE__, nil)
    pid
  end

  # Outward-facing API
  def add_job(pid, job_function) do
    # async message passing.
    GenServer.cast(pid, {:add_job, job_function})
  end

  # Internal Callback
  def handle_cast({:add_job, job_function}, _state) when is_function(job_function) do
    # The BackgroundWorker GenServer uses the message mailbox as queue.
    function.()
    {:noreply, nil}
  end
end

This simple implementation (note: untested, maybe there are typos) handles exactly one job at a time, but can of course be extended to handle multiple ones by adding another layer of indirection.

1 Like

Given this, I’d recommend you start with a GenServer solution, because it could be a bit simpler, and it’s a good chance to practice some basic OTP a bit :slight_smile:

You could start very simple and implement a queue which runs one job at a time. For that, I’d likely use a scheduler GenServer and a Task.Supervisor. The scheduler process receives a request, and if no job is running it starts a task under the task supervisor. The scheduler also sets up a monitor to the started task, so it can know when the task finishes (or crashes). Once that happens, the scheduler can run the next job, if such is available, or otherwise just clear the monitor from its state, to indicate that no job is running. If a request arrives when something is running (which you can see by the fact that you have a non-nil monitor reference in your state), you just store the request into an internal state and wait until the job finishes.

I’d likely use the :queue module to store the collection of pending jobs. Of the entire API I think all you need is :queue.new, :queue.in, and :queue.out.

You probably want to make sure that both the scheduler and the task supervisor live and die as a unit. A crash of the scheduler should take down the supervisor, because otherwise a restarted scheduler could wrongly think that no job is running. Also, make sure that the scheduler starts after the supervisor, because otherwise it might receive a request before the supervisor is started, and then it won’t be able to start the corresponding task in the supervisor…

Once you have it working for one job at a time, it should be fairly straightforward to expand it for max N simultaneous jobs.

Finally, you could try to do the same thing with GenStage, and compare both solutions. A retrospective blog post would be a very interesting read :wink:

Best of luck!

9 Likes

Thank you both @sasajuric and @Qqwy for the help. This is what I’ve come up with so far:

defmodule JobManager.Supervisor do
  use Supervisor
  @name __MODULE__

  def start_link do
    Supervisor.start_link(__MODULE__, :ok, name: @name)
  end

  def init(:ok) do
    children = [
      supervisor(Task.Supervisor, [[name: JobManager.JobSupervisor]]),
      worker(JobManager, [])
    ]

    supervise(children, strategy: :one_for_one)
  end
end

defmodule JobManager do
  use GenServer
  require Logger

  @name __MODULE__

  def start_link do
    Logger.debug "Starting GenServer: #{@name}"
    GenServer.start_link(__MODULE__, :ok, name: @name)
  end

  def add_job(new_job) do
    Logger.debug "AddJob - Job: #{inspect(new_job)}"
    GenServer.call(@name, {:add_job, new_job})
  end


  # Internal Callbacks

  def init(:ok) do
    Logger.debug "Init (:ok) - Creating empty queue"
    state = {:queue.new, nil} # Queue and Current Job tuple
    {:ok, state}
  end

  def handle_call({:add_job, new_job}, _from, {q, c_job} = state) when is_function(new_job) do
    Logger.debug "HandleCall (:add_job) - state: #{inspect state}"

    state = {q, c_job} = {:queue.in(new_job, q), c_job}
    Logger.debug "HandleCall (:add_job) - pushed job to queue: #{inspect q}"

    state =
      case c_job do
        nil -> process_next_in_queue(state)
        _   -> state
      end

    Logger.debug "HandleCall (:add_job) - new_state: #{inspect state}"
    {:reply, :ok, state}
  end

  def handle_info({:DOWN, _ref, :process, _pid, :normal}, {q, c_job}) do
    Logger.debug("HandleInfo - Task Completed, moving on to next")
    {:noreply, process_next_in_queue({q, nil})}
  end

  def handle_info(msg, state) do
    Logger.debug("HandleInfo - Received Message: #{inspect msg}")
    {:noreply, state}
  end

  def process_next_in_queue({q, nil}) do
    Logger.debug("ProcessNext - Current Job is nil, Processing Queue: #{inspect q}")

    case :queue.out(q) do
      {:empty, q}        -> {q, nil}
      {{:value, job}, q} -> {q, Task.Supervisor.async(JobManager.JobSupervisor, job)}
    end
  end
end

I’ve added JobManager.Supervisor to my Application children, and I can successfully add jobs to the queue (which get processed one after the other) by calling JobManager.add_job(fun), but the issue now is that if one of the jobs crash, the JobManager GenServer does as well which resets the queue. I want to be able to simply move on to the next job (and execute some cleanup code and keep a record of the failed jobs - but that isn’t important now).

I’ve looked into Task.Supervisor.start_child which doesn’t crash the caller, but doesn’t notify the caller about task completion as well. How would I go about doing that? Would also appreciate any pointers and feedback on the code. Thanks!

Try: https://hexdocs.pm/elixir/Task.Supervisor.html#async_nolink/2

And check what you receive in handle_info :slight_smile:

(Edit: I would also suggest moving the is_function check to the client code (add_job), rather than the server code. It makes more sense to crash the client than the server if a non-function is passed, since the bug is in the client.)

2 Likes

I’ll say ‘both’, then… :wink: There’s no one forcing someone to use the client code to send messages to the server, so if it’s important to you to check it (in order to prevent garbage in your state), checking on the server side is where to do it first. Could certainly be helpful in the client code too, of course.

As I originally mentioned, the road I’d likely take is to use process monitor. The assumption here is that the job manager is not interested in the result of the task, but only in whether the task finished (either successfully, or through a crash). In such case, using Task.async (or async_nolink) makes less sense, since we don’t need that extra message from the task to its creator.

Therefore, I’d just do a {:ok, pid} = Task.start_link(...), followed by a mref = Process.monitor(pid), and finally I’d store mref somewhere in the state. Finally, in handle_info clause of a :DOWN message, I’d specifically check whether the mref included in the message corresponds to some of mrefs I keep in the state. If yes, I can conclude that the corresponding task has finished, clean that mref from the state, and start the next taxt from the queue (if such exists).

3 Likes

While I agree that anyone can bypass interface functions and issue a GenServer.call/cast directly, I’d say this is most often not done, and is almost always a hack. The point of interface functions is to hide the specifics of the shape of messages. While interface functions are public and documented part of the module, shape of call/cast messages most often aren’t, so they shouldn’t be used.

Therefore, I think that verifying argument only in the interface function is fine. I don’t think there’s anything wrong with doing it in the server as well, but I personally wouldn’t do that, because I think it adds noise for little to no real benefits.

I also agree with this reasoning:

The bug is indeed in the client, so crashing in the client won’t disturb no other clients nor the server. In contrast, crashing in the server could lead to much wider negative consequences.

2 Likes

Fair enough - I guess in the end it comes down to what part of the system the GenServer sits in, and what it’s doing; definitely crashing the client should have less impact, and having taken care of 99.99% of potential wrong input cases, if you can tolerate a potentially corrupted state with late discovery in that last edge case you’ll be fine :slight_smile:

For most purposes, you can tolerate that.