Using Ecto to run a long-running multi-process transaction

Nice! Guess I’ll watch this space.

So it turns out you can actually hook into the :gen_server lifecycle before the loop actually starts. Initially, this part in the try docs had me worried about running a recursive loop inside:

Note that calls inside try/1 are not tail recursive since the VM needs to keep the stacktrace in case an exception happens.

However, it turns out that this restriction just means you can’t (with TCO) recursively call the function that the try block is in, from within the try block. Recursive functions within function fine.

Here is the solution I have running now, which I’m much happier with.

defmodule PFServer.TransactionManager do
  @moduledoc """
  A transaction manager, which is a GenServer which can execute `Repo` commands inside a transaction.
  """

  alias PFServer.Repo
  use GenServer

  def start_link do
    # logic taken from https://github.com/elixir-lang/elixir/blob/master/lib/elixir/lib/gen_server.ex#L671
    :gen.start(__MODULE__, :link, {:local, __MODULE__}, __MODULE__, [], [])
  end

  def execute(fun) when is_function(fun) do
    GenServer.call(__MODULE__, {:execute, fun})
  end

  def finish do
    GenServer.stop(__MODULE__)
  end

  # callbacks

  # Called by :gen to start the "GenServer" loop. We wrap the whole process in a transaction, catch exits and defer
  # to :gen_server
  def init_it(starter, parent, name0, mod, args, options) do
    Repo.transaction(fn ->
      try do
        :gen_server.init_it(starter, parent, name0, mod, args, options)
      catch
        :exit, :normal -> :ok
      end
      # Other types of errors and exits should fall through and kill the transaction
    end)
  end

  def init([]) do
    {:ok, {}}
  end

  def handle_call({:execute, fun}, _from, {}) do
    {:reply, fun.(), {}}
  end
end

Further, regarding the hibernation concern, it may still work properly: :gen_server uses proc_lib:hibernate instead of the naïve erlang:hibernate, and it seems to ensure that catch blocks still work correctly. I would assume this handle Elixir try/catch blocks too. I haven’t tested this though, as I don’t plan to hibernate this process in any case.

Edit: this actually doesn’t behave properly under some error conditions (GenServer catches some of the errors). I’ll investigate further.

@fishcakez the other thing I’m running into here is

[error] Postgrex.Protocol (#PID<0.335.0>) disconnected: ** (DBConnection.ConnectionError) client #PID<0.350.0> timed out because it checked out the connection for longer than 15000ms

in the middle of the transaction. Now I’m trying to figure out how to alter that timeout on a per-process basis without allowing huge or infinite timeouts for all other connections/queries. It looks like it’d have to involve fiddling with the process dictionary directly since there’s not way to pass options to Repo.transaction, and so it lands me squarely back into “things I’d rather not be doing” territory.

Mentioning you specifically since I know you may have insight into how the internals are structured with regards to this.

Can you share WIP/Link to this alternative API ?

Repo.transaction/1,2 accepts options as optional second argument.

The work is currently only on lower level libraries than Ecto. I will reply when a PR appears in Ecto.

Thanks! It seems obvious but I totally missed it.

Could you provide a link to the issue/PR in the lower-level libs nevertheless? I’d find it interesting to follow it and maybe learn more about the internals.

Support for Repo.stream as a GenStage producer begins at this level: https://github.com/elixir-ecto/db_connection/pull/82. I also have a consumer/producer_consumer that can do raw transactions in a similar style to this approach/your GenServer, I will push it there later today if I have time to fix up Flow support.

1 Like

Thanks @fishcakez, the stuff you’ve linked looks really interesting. It still requires the entire transaction to be contained within a single stage. I couldn’t work it out from reading the code, but the stage you wrote does not “automatically” get parallelised by Flow (sharing the same transaction across the instances), does it?

To shed some light on why I believe that I need to be able to do this across multiple stages, here is my code for importing using Flow:

def import_data do
    ImportMeta.start_link()
    ImportMeta.start_import()
    TM.start_link()

    org_stream = AirtableAPI.stream_resource("Organizations")

    org_stream
    |> Flow.from_enumerable()
    |> Flow.map(&Organization.from_airtable_record!/1)
    |> Flow.each(
      fn cset ->
       org = TM.execute(fn -> Repo.insert!(cset) end)
       ImportMeta.register_organization(org)
      end)
    |> Flow.run()

    program_stream = AirtableAPI.stream_resource("Programs")

    program_stream
    |> Flow.from_enumerable()
    |> Flow.map(&Program.from_airtable_record!/1)
    |> Flow.each(
      fn cset ->
        program = TM.execute(fn -> Repo.insert! cset end)
        unless Program.is_geolocated?(program), do: ImportMeta.add_failed_geolocation(program)
      end)
    |> Flow.run

    TM.finish
    %{failed_geolocation: failed} = ImportMeta.end_import

    unless Enum.empty?(failed), do: Logger.warn("The following ids failed geolocation. Check if their addresses are valid.\n#{inspect failed}")
    Logger.info "Finished import."
  end

As you can see, I need the first chunk of the import (the organizations) to fully run through before I can start on importing the programs. That’s because on importing a program I need to look up its organization in the database to get the correct ID for the association.

Further, Program.from_airtable_record! can insert new categories as a side effect (they are denormalised in the source data but normalised in my database).

If I’m missing a “better” way of doing something like that which doesn’t require these contortions then let me know. I still prefer this to importing into a separate table and renaming tables at runtime because I think it’s much easier to maintain the state around something like this in Elixir processes and GenServers, being guaranteed that if they fail the database gets rolled back to a good state, rather than having to perform cleanup, checking for partial results etc manually.

I have since worked a little more on my GenServer solution and am now handling exits properly (before this it would crash the connection process instead of letting the transaction function trap it) as well as adding a timeout functionality (which times out when a command has not been called for a certain time, rather than limiting the execution time of the whole transaction).

Here it is (again, this is not yet fully tested and is not in production so be careful using it)

defmodule PFServer.TransactionManager do
  @moduledoc """
  A transaction manager, which is a GenServer which can execute `Repo` commands inside a transaction.

  It is important to run this in a supervision tree with, or at least linked to, all other processes which may use it.
  It will die on errors and should not be restarted without the entire tree of processes using it being restarted as well.
  """

  alias PFServer.Repo
  use GenServer

  @timeout 60_000 # allow one minute with no calls before terminating

  # TODO consider making timeout configurable
  # TODO test this properly
  # TODO consider making this non-global, making it possible to run multiple at once.

  def start_link do
    # logic taken from https://github.com/elixir-lang/elixir/blob/master/lib/elixir/lib/gen_server.ex#L671
    :gen.start(__MODULE__, :link, {:local, __MODULE__}, __MODULE__, [], [])
  end

  def execute(fun) when is_function(fun) do
    GenServer.call(__MODULE__, {:execute, fun})
  end

  def finish do
    GenServer.stop(__MODULE__)
  end

  # callbacks

  # Called by :gen to start the "GenServer" loop. We wrap the whole process in a transaction, catch exits and defer
  # to :gen_server
  def init_it(starter, parent, name0, mod, args, options) do
    Repo.transaction(fn ->
      try do
        :gen_server.init_it(starter, parent, name0, mod, args, options)
      catch
        :exit, :normal -> :ok
      end
      # Other types of errors and exits should fall through and kill the transaction
    end, timeout: :infinity)
  end

  def init([]) do
    Process.flag(:trap_exit, true)
    {:ok, {}, @timeout}
  end

  def handle_call({:execute, fun}, _from, {}) do
    {:reply, fun.(), {}, @timeout}
  end

  def handle_info(:timeout, {}) do
    {:stop, :timeout}
  end
end
1 Like