I’m attempting to model the following problem with a process (or set of processes) – I need to consume an HTTP2 streaming API and enqueue Oban jobs based on the events from this API.
I’m using Oban for database-backed jobs and Finch as a higher-level wrapper around Mint for my HTTP client.
I’d also like to be able to test this process by spinning it up from an ExUnit test, ensuring it enqueues some Oban jobs, and then have it spin down gracefully after the text exits.
So far, I haven’t been able to find a suitable solution. One of the primary problems is that, when my test attempts to shutdown, my process may still be communicating with the database, attempting to enqueue jobs via Oban.
Here are some thoughts I’ve had about modeling this:
- It should probably be done via a GenServer
- Initially I thought the “looping” code that pulls messages from the HTTP2 stream could be done in a
handle_info
callback, but the API provided by finch effectively “blocks” the GenServer from receiving other messages while the connection is open. Therefore, it seems my GenServer should probably spin up a supervised task to read from the API and send messages back to the GenServer for processing. - In order to exit gracefully, I need to know if the server should be attempting to process messages it receives or ignore them because it is in the process of shutting down.
- I want to attempt to handle this graceful shutdown via the
terminate
callback, as I’ll be starting my GenServer viastart_supervised
in my tests and I don’t want to require specialized cleanup code.
Given these constraints, I thought I had a solution. Here’s a sketch of what I thought might work:
defmodule WorkerExperiment.Worker do
use GenServer
def start_link(_args, opts \\ []) do
GenServer.start_link(__MODULE__, nil, opts)
end
@impl GenServer
def init(_) do
Process.flag(:trap_exit, true)
{:ok, :running, {:continue, :start_stream}}
end
@impl GenServer
def handle_continue(:start_stream, state) do
Task.Supervisor.start_child(
WorkerExperiment.TaskSupervisor,
__MODULE__,
:stream,
[self()]
)
{:noreply, state}
end
@impl GenServer
def terminate(_reason, state) do
:ok = GenServer.call(self(), :stop)
state
end
@impl GenServer
def handle_call(:stop, _from, _state), do: {:reply, :ok, :stopped}
@impl GenServer
def handle_info({:process_message, _}, :stopped), do: {:noreply, :stopped}
def handle_info({:process_message, i}, :running) do
# Oban insertion should happen here
IO.inspect("Processing message #{i}")
{:noreply, :running}
end
def stream(pid) do
WorkerExperiment.HttpStream.stream(0, fn i, acc ->
send(pid, {:process_message, i})
acc + i
end)
end
end
Note that WorkerExperiment.HttpStream.stream
is just a dummy function that reduces over an infinite stream, attempting to replicate the idea of keeping an HTTP2 connection open for (potentially) a very long time, with roughly the same API provided by Finch.
This almost does the trick, but alas I learned the hard way that GenServer’s cannot cast messages to themselves. My understanding is that the action that tells the server to “stop processing” must be sync, in order to solve the “severing-DB-connection-issue” referenced in the Ecto.Adapters.SQL.Sandbox FAQ.
What am I missing here? Is there a better way to model this problem that solves all of the following:
- Consumes messages from the HTTP2 stream
- Atomically inserts the messages for processing into the database via Oban
- Gracefully shuts down during testing
- Doesn’t require manually calling
GenServer.call(pid, :some_message)
at the end of my test to shut down cleanly, but rather leverages the supervision tree viastart_supervised
and hooks into the normal OTP lifecycle of shutdown
Thanks!