I have a background job that processes an API and then writes the results into a DB using Ecto.
My code is like this:
defmodule Shop.Jobs.BookProcessor do
use GenServer
def start_link(size) do
GenServer.start_link(__MODULE__, size)
end
def init(size) do
schedule_work() # Schedule work to be performed at some point
{:ok, size}
end
def handle_info(:work, size) do
# Call API and process response into DB
schedule_work()
{:noreply, size}
end
defp schedule_work() do
Process.send_after(self(), :work, 30 * 60 * 1000)
end
end
So far I have a test like below:
defmodule Shop.Jobs.BookProcessorTest do
use Shop.DataCase
setup do
{:ok, book_processor} = start_supervised Shop.Jobs.BookProcessor
%{book_processor: book_processor}
end
test "handle_info/2 :work", %{book_processor: book_processor} do
user = Utils.create_user()
assert :work = send(book_processor, :work)
assert length(Shop.Account.list_books(user.id)) == 2
end
end
However the assertion fails with
Postgrex.Protocol (#PID<0.405.0>) disconnected: ** (DBConnection.ConnectionError) owner #PID<0.450.0> exited while client #PID<0.453.0> is still running with: shutdown
Presumably because the job is running asynchronously and the test terminates the process before it’s complete.
My question: Is there a way to block the test until the job is complete?
I had a similar problem in the project I’m working on, where a Genserver performs an action from time to time. Instead of testing the scheduler we only tested the work function itself calling it manually in the tests. The scheduler doesn’t even run in test environment, for example:
defp schedule_worker() do
if Mix.env != :test do
Process.send_after(self(), :work, :timer.seconds(5))
end
end
My question: Is there a way to block the test until the job is complete?
yes you can use Process.sleep/1 before calling the last assertion, Process.sleep(200) would wait 200ms before proceeding, though I wouldn’t do that. We ended up having a synchronous alternative for every cron-like function to use while testing (in cli or in tests), here’s the simplified code of such a server:
defmodule App.Server do
@moduledoc """
Periocially runs tasks.
## Example
config :app, :task,
interval: 3_600_000 # 1 hour
"""
use GenServer
require Logger
@spec start_link(list(), list()) :: tuple()
def start_link(state \\ %{}, _opts \\ []) do
# determine the interval with default 1 hour
interval = state[:interval] || Application.get_env(:app, :task, [interval: 3_600_000])[:interval]
GenServer.start_link(__MODULE__, %{interval: interval}, name: __MODULE__)
end
@doc """
Manually trigger a task run.
Also resets the current timer.
"""
def do_task_now! do
GenServer.call(__MODULE__, :do_task_now)
end
@spec init(map()) :: tuple()
def init(state) do
{:ok, schedule_next_run!(state)}
end
@spec handle_call(atom(), pid(), map()) :: tuple()
def handle_call(:do_task_now, _from, state) do
{:reply, :ok, do_task!(state)}
end
@spec handle_info(atom(), list()) :: tuple()
def handle_info(:do_task, state) do
{:noreply, do_task!(state)}
end
@spec handle_info(any(), list()) :: tuple()
def handle_info(message, state) do
Logger.info("Task server got an unknown message: #{inspect(message)}")
{:noreply, state}
end
# actual job goes here
defp do_task!(state) do
# do something
schedule_next_run!(state)
end
defp schedule_next_run!(state) do
Logger.info("Scheduling the next task run in #{state[:interval]} ms")
if state[:timer] do
Process.cancel_timer(state.timer)
end
timer = Process.send_after(self(), :do_task, state[:interval])
Map.merge(state, %{timer: timer})
end
end