Building a process to consume an HTTP2 stream of events and handle graceful shutdown during testing

I’m attempting to model the following problem with a process (or set of processes) – I need to consume an HTTP2 streaming API and enqueue Oban jobs based on the events from this API.

I’m using Oban for database-backed jobs and Finch as a higher-level wrapper around Mint for my HTTP client.

I’d also like to be able to test this process by spinning it up from an ExUnit test, ensuring it enqueues some Oban jobs, and then have it spin down gracefully after the text exits.

So far, I haven’t been able to find a suitable solution. One of the primary problems is that, when my test attempts to shutdown, my process may still be communicating with the database, attempting to enqueue jobs via Oban.

Here are some thoughts I’ve had about modeling this:

  1. It should probably be done via a GenServer
  2. Initially I thought the “looping” code that pulls messages from the HTTP2 stream could be done in a handle_info callback, but the API provided by finch effectively “blocks” the GenServer from receiving other messages while the connection is open. Therefore, it seems my GenServer should probably spin up a supervised task to read from the API and send messages back to the GenServer for processing.
  3. In order to exit gracefully, I need to know if the server should be attempting to process messages it receives or ignore them because it is in the process of shutting down.
  4. I want to attempt to handle this graceful shutdown via the terminate callback, as I’ll be starting my GenServer via start_supervised in my tests and I don’t want to require specialized cleanup code.

Given these constraints, I thought I had a solution. Here’s a sketch of what I thought might work:

defmodule WorkerExperiment.Worker do
  use GenServer

  def start_link(_args, opts \\ []) do
    GenServer.start_link(__MODULE__, nil, opts)
  end

  @impl GenServer
  def init(_) do
    Process.flag(:trap_exit, true)

    {:ok, :running, {:continue, :start_stream}}
  end

  @impl GenServer
  def handle_continue(:start_stream, state) do
    Task.Supervisor.start_child(
      WorkerExperiment.TaskSupervisor,
      __MODULE__,
      :stream,
      [self()]
    )

    {:noreply, state}
  end

  @impl GenServer
  def terminate(_reason, state) do
    :ok = GenServer.call(self(), :stop)

    state
  end

  @impl GenServer
  def handle_call(:stop, _from, _state), do: {:reply, :ok, :stopped}

  @impl GenServer
  def handle_info({:process_message, _}, :stopped), do: {:noreply, :stopped}

  def handle_info({:process_message, i}, :running) do
    # Oban insertion should happen here
    IO.inspect("Processing message #{i}")

    {:noreply, :running}
  end

  def stream(pid) do
    WorkerExperiment.HttpStream.stream(0, fn i, acc ->
      send(pid, {:process_message, i})
      acc + i
    end)
  end
end

Note that WorkerExperiment.HttpStream.stream is just a dummy function that reduces over an infinite stream, attempting to replicate the idea of keeping an HTTP2 connection open for (potentially) a very long time, with roughly the same API provided by Finch.

This almost does the trick, but alas I learned the hard way that GenServer’s cannot cast messages to themselves. My understanding is that the action that tells the server to “stop processing” must be sync, in order to solve the “severing-DB-connection-issue” referenced in the Ecto.Adapters.SQL.Sandbox FAQ.

What am I missing here? Is there a better way to model this problem that solves all of the following:

  1. Consumes messages from the HTTP2 stream
  2. Atomically inserts the messages for processing into the database via Oban
  3. Gracefully shuts down during testing
  4. Doesn’t require manually calling GenServer.call(pid, :some_message) at the end of my test to shut down cleanly, but rather leverages the supervision tree via start_supervised and hooks into the normal OTP lifecycle of shutdown

Thanks!

1 Like

I actually pushed an example of what I’m struggling with to github: https://github.com/drewolson/worker_experiment

This is a pretty common issue when working with GenServers that interact with Ecto independently. You’ll end up with a race condition between the point where the process checks out a connection and executes it, and when your process is stopped by the test. The connections “owner” is your GenServer process which no longer exists, hence the error message.

The good news is that it is only a problem when testing due to the Ecto sandbox. If you’re set on preventing it you’ll need to put in something that blocks that process from shutting down until it is finished processing.

After reading through the GH example I think it could be as simple as shutting down the task within terminate/2 (not sure if this translates to the actual Finch code):

  @impl GenServer
  def init(_) do
    state = %{mode: :running, task_pid: nil}

    {:ok, state, {:continue, :start_stream}}
  end

  @impl GenServer
  def handle_continue(:start_stream, state) do
    {:ok, pid} = Task.start_link(__MODULE__, :stream, [self()])

    {:noreply, %{state | task_pid: pid}}
  end

  @impl GenServer
  def terminate(_reason, state) do
    if is_pid(state.task_pid), do: Task.shutdown(state.task_pid)

    :ok
  end
1 Like

Thanks for confirming that this is a common issue and that I don’t seem to be misunderstanding something fundamental here.

I agree this isn’t actually a problem in production, but I find the fact that it is logging during testing a bit of a nuisance. More importantly, however, it made me realize that I was quite fuzzy on what exactly was causing the issue and if there was an appropriate fix in this case.

I’ve tried as you’ve suggested on a branch and I’m still receiving the same error. Perhaps this is because the process mailbox is still back-logged with messages from the Task?

Thanks for your thoughtful response.

Ah, it is using an async send so there isn’t any backpressure. That would surely flood the mailbox. Switching from handle_info to a handle_call to make it synchronous should help.

2 Likes

This was exactly it. Thanks!

I’ve pushed a solution here: https://github.com/drewolson/worker_experiment/blob/0c3a2448c8559f7fcb0dc2757e126d5963e123f8/lib/worker_experiment/worker.ex

2 Likes