How to test a Genserver background job that uses handle_info/2?

I have a background job that processes an API and then writes the results into a DB using Ecto.

My code is like this:

defmodule Shop.Jobs.BookProcessor do
  use GenServer

  def start_link(size) do
    GenServer.start_link(__MODULE__, size)
  end

  def init(size) do
    schedule_work() # Schedule work to be performed at some point
    {:ok, size}
  end

  def handle_info(:work, size) do
    # Call API and process response into DB
    schedule_work()
    {:noreply, size}
  end

  defp schedule_work() do
    Process.send_after(self(), :work, 30 * 60 * 1000)
  end
end

So far I have a test like below:

defmodule Shop.Jobs.BookProcessorTest do
  use Shop.DataCase

  setup do
    {:ok, book_processor} = start_supervised Shop.Jobs.BookProcessor
    %{book_processor: book_processor}
  end


  test "handle_info/2 :work", %{book_processor: book_processor} do
    user = Utils.create_user()
    assert :work = send(book_processor, :work)
    assert length(Shop.Account.list_books(user.id)) == 2
  end
end

However the assertion fails with

Postgrex.Protocol (#PID<0.405.0>) disconnected: ** (DBConnection.ConnectionError) owner #PID<0.450.0> exited while client #PID<0.453.0> is still running with: shutdown

Presumably because the job is running asynchronously and the test terminates the process before it’s complete.

My question: Is there a way to block the test until the job is complete?

I had a similar problem in the project I’m working on, where a Genserver performs an action from time to time. Instead of testing the scheduler we only tested the work function itself calling it manually in the tests. The scheduler doesn’t even run in test environment, for example:

  defp schedule_worker() do
    if Mix.env != :test do
      Process.send_after(self(), :work, :timer.seconds(5))
    end
  end

It’s not ideal, but it worked for us :slight_smile:

1 Like

My question: Is there a way to block the test until the job is complete?

yes you can use Process.sleep/1 before calling the last assertion, Process.sleep(200) would wait 200ms before proceeding, though I wouldn’t do that. We ended up having a synchronous alternative for every cron-like function to use while testing (in cli or in tests), here’s the simplified code of such a server:

defmodule App.Server do
  @moduledoc """
  Periocially runs tasks.

  ## Example
  config :app, :task,
    interval: 3_600_000 # 1 hour
  """
  use GenServer

  require Logger

  @spec start_link(list(), list()) :: tuple()
  def start_link(state \\ %{}, _opts \\ []) do
    # determine the interval with default 1 hour
    interval = state[:interval] || Application.get_env(:app, :task, [interval: 3_600_000])[:interval]
    GenServer.start_link(__MODULE__, %{interval: interval}, name: __MODULE__)
  end

  @doc """
  Manually trigger a task run.
  Also resets the current timer.
  """
  def do_task_now! do
    GenServer.call(__MODULE__, :do_task_now)
  end

  @spec init(map()) :: tuple()
  def init(state) do
    {:ok, schedule_next_run!(state)}
  end

  @spec handle_call(atom(), pid(), map()) :: tuple()
  def handle_call(:do_task_now, _from, state) do
    {:reply, :ok, do_task!(state)}
  end

  @spec handle_info(atom(), list()) :: tuple()
  def handle_info(:do_task, state) do
    {:noreply, do_task!(state)}
  end

  @spec handle_info(any(), list()) :: tuple()
  def handle_info(message, state) do
    Logger.info("Task server got an unknown message: #{inspect(message)}")
    {:noreply, state}
  end

  # actual job goes here
  defp do_task!(state) do
    # do something
    schedule_next_run!(state)
  end

  defp schedule_next_run!(state) do
    Logger.info("Scheduling the next task run in #{state[:interval]} ms")

    if state[:timer] do
      Process.cancel_timer(state.timer)
    end

    timer = Process.send_after(self(), :do_task, state[:interval])
    Map.merge(state, %{timer: timer})
  end

end
2 Likes