I’m building an app which provides a read-only GraphQL API to a React frontend. The app stores details of many after-school programs in a certain area.
The actual program data is stored and managed in an external system (Airtable). Once in a while, an import is run to replace all of the data in the Elixir app with the fresh data. Because the data needs to be processed, geolocated etc, and there are potentially tens of thousands of rows, I don’t want to process all of the data as a huge chunk in memory. I will be using either Task.async_stream
or GenStage
to do the processing.
However, I want to run the whole process (drop all current data, insert new data) in a Postgres transaction. I know truncating the tables will place a read lock on them too, and indeed access to the API will be disabled altogether while the import is running. However I want the option to rollback if errors are encountered during the process.
The only tools Ecto provides are the two variants of Repo.transaction/2
. I cannot use Multi
since the whole point is to not store all of the data in memory at once. However while playing around with the function variant, I discovered that any Repo
calls must be made within that function (or more specifically within that process while the function is being executed), since it is the process that locks the connection and sets up the transaction. Therefore processing the data asynchronously with different processes calling DB functions does not work as expected—any Repo
functions called in a different process will not be part of the transaction.
The immediate “shape” of the solution to this kind of serialisation problem is usually a GenServer
, but there is no way to separately start and stop the transaction, nor is there a way (without reimplementing/forking :gen_server
) to run the GenServer
loop inside a transaction function. I came up with this solution instead:
defmodule PFServer.TransactionManager do
alias PFServer.Repo
use GenServer
def start_link do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def execute(fun) when is_function(fun) do
GenServer.call(__MODULE__, {:execute, fun})
end
def finish do
GenServer.stop(__MODULE__)
end
# callbacks
def init([]) do
{:ok, pid} = start_link_executor()
{:ok, pid}
end
def handle_call({:execute, fun}, from, pid) do
send pid, {:execute, fun, from}
{:noreply, pid}
end
def terminate(_reason, pid) do
send pid, :finish
end
# Task execution process
defp start_link_executor do
Task.start_link(&execute_until_done/0)
end
defp execute_until_done do
Repo.transaction(&execute_all/0)
end
defp execute_all do
receive do
{:execute, fun, from} ->
GenServer.reply from, fun.()
execute_all()
:finish -> :ok
end
end
end
and now I can spin this up and use TransactionManager.execute(<repo accessing fn>)
.
Implementing my own receive loop seems like a code smell to me, maybe it’s the learned reliance on OTP to handle any and all messaging concurrency.
Is this an acceptable solution? Am I going to run into problems with this? Is there any other way to handle this? It seems like the OTP way would be to just have the GenServer handle the whole serialization, but like I mentioned this would require manual control over transaction begin and commit.