Using Ecto to run a long-running multi-process transaction

I’m building an app which provides a read-only GraphQL API to a React frontend. The app stores details of many after-school programs in a certain area.

The actual program data is stored and managed in an external system (Airtable). Once in a while, an import is run to replace all of the data in the Elixir app with the fresh data. Because the data needs to be processed, geolocated etc, and there are potentially tens of thousands of rows, I don’t want to process all of the data as a huge chunk in memory. I will be using either Task.async_stream or GenStage to do the processing.

However, I want to run the whole process (drop all current data, insert new data) in a Postgres transaction. I know truncating the tables will place a read lock on them too, and indeed access to the API will be disabled altogether while the import is running. However I want the option to rollback if errors are encountered during the process.

The only tools Ecto provides are the two variants of Repo.transaction/2. I cannot use Multi since the whole point is to not store all of the data in memory at once. However while playing around with the function variant, I discovered that any Repo calls must be made within that function (or more specifically within that process while the function is being executed), since it is the process that locks the connection and sets up the transaction. Therefore processing the data asynchronously with different processes calling DB functions does not work as expected—any Repo functions called in a different process will not be part of the transaction.

The immediate “shape” of the solution to this kind of serialisation problem is usually a GenServer, but there is no way to separately start and stop the transaction, nor is there a way (without reimplementing/forking :gen_server) to run the GenServer loop inside a transaction function. I came up with this solution instead:

defmodule PFServer.TransactionManager do
  alias PFServer.Repo
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  def execute(fun) when is_function(fun) do
    GenServer.call(__MODULE__, {:execute, fun})
  end

  def finish do
    GenServer.stop(__MODULE__)
  end


  # callbacks

  def init([]) do
    {:ok, pid} = start_link_executor()

    {:ok, pid}
  end

  def handle_call({:execute, fun}, from, pid) do
    send pid, {:execute, fun, from}

    {:noreply, pid}
  end

  def terminate(_reason, pid) do
    send pid, :finish
  end

  # Task execution process
  defp start_link_executor do
    Task.start_link(&execute_until_done/0)
  end

  defp execute_until_done do
    Repo.transaction(&execute_all/0)
  end

  defp execute_all do
    receive do
      {:execute, fun, from} ->
        GenServer.reply from, fun.()
        execute_all()

      :finish -> :ok
    end
  end
end

and now I can spin this up and use TransactionManager.execute(<repo accessing fn>).

Implementing my own receive loop seems like a code smell to me, maybe it’s the learned reliance on OTP to handle any and all messaging concurrency.

Is this an acceptable solution? Am I going to run into problems with this? Is there any other way to handle this? It seems like the OTP way would be to just have the GenServer handle the whole serialization, but like I mentioned this would require manual control over transaction begin and commit.

2 Likes

I would version all the data rows, eg. import_version 1 on all rows…

then you can import/process/insert import_version 2 in the most efficient way and then bump your graphql resolver to select where import_version 2 when everything has been verified to be ok…

you can manage the import_version integer in (d)ets or in it’s own db table (wrap it with concache or something that will cache it in ets/memory) - this way you can also rollback, or go back in time and identify errors etc.

as a rule I would never delete anything in a DB.

1 Like

Yeah I like the versioned approach a lot here. It lets you easily import all the new data and then seamlessly switch to using it, you could even easily revert if you had to for some reason.

If space is a concern you can eventually purge old versions as necessary.

1 Like

Since we have a canonical copy of the data in a separate store, I’m not too concerned with retaining previous versions of the data. Importing the data into a second copy of the tables and then switching over would indeed work, but would require much more logic, as well as manual cleanup (if there is an error, have to truncate the half-imported tables manually instead of just rolling back the transaction).

In any case, the issue of how to do a large transaction in Ecto which may involve more than one process remains, and there may be other use cases for this. The code I have now works, and save for perhaps adding some timeout handling for robustness, seems like it does the job—I was just wondering if there was a more straightforward solution.

2 Likes

Hi mjadczak,

Based on your idea, we sketch our own dirty handling transaction code. Since, we need to ensure that these transaction handling should be:

  • Compliance with try, rescue, after; cause we should reset our own state through ‘after’. With your genserver approach, our ‘after’ code didn’t executed while raise some exception, leaving our state inconsistent. I didn’t know why :frowning:

  • Can easily acquire another transaction

Here our dirty trial code

defmodule Fintech.Transaction do
    alias Fintech.Repo

    def open do
        Repo.transaction &loop/0
    end

    def execute(pid, fun) when is_pid(pid)
        and is_function(fun) do
        send pid, {:execute, fun}
    end

    def rollback(pid) when is_pid(pid) do
        send pid, :rollback    
    end

    def close(pid) when is_pid(pid) do
        send pid, :close
    end

    defp loop do
        receive do
            {:execute, fun} ->
                apply fun, []
                loop()
            :rollback ->
                Repo.rollback :error
                :ok
            :close -> :ok
            _ ->
                Repo.rollback :error
                :ok
        end
    end
end

Sample of executed

  def test_trans do
    pid = spawn Transaction, :open, []
    // set our own state
    try do
      Transaction.execute pid, our_db_trans
      Transaction.execute pid, other_db_trans
      Transaction.execute pid, other_db_trans
      Transaction.close pid
    after
      IO.puts "after executed"
      // set our own state
    rescue
      RuntimeError ->
        Transaction.rollback pid
        IO.puts "rescue runtime error executed"
      _ ->
        Transaction.rollback pid
        IO.puts "other rescue executed"
    end
  end
1 Like

Since you are already looking at GenStage … here’s how I might write this using Flow:

defmodule ExtoTest do
  use Ecto.Repo, otp_app: :ectotest

  def flowing_inner_trans(input) do
    IO.puts "INNER: Are we in a transaction here? #{ExtoTest.in_transaction?}"
    input
  end

  def proc_query(query) do
    IO.puts "OUTER: re we in a transaction here? #{ExtoTest.in_transaction?} #{query}"
  end

  def flowing do
    1..30
    |> Flow.from_enumerable(max_demand: 4)
    |> Flow.each(&ExtoTest.flowing_inner_trans/1)
    |> Stream.each(&ExtoTest.proc_query/1)
    |> Stream.run
  end                             
                                  
  def multi_proc_transactions do  
    ExtoTest.transaction(&ExtoTest.flowing/0)
  end
end

This handles the syncronization for “free” thanks to the Stream.each at the end without ruining the laziness of it. Note that for the above, since it is just producing 30 values, I set the max_demand to something artificially low so you can actually see the async processing:

iex(17)> ExtoTest.multi_proc_transactions

12:30:41.696 [debug] QUERY OK db=0.7ms queue=0.1ms                                                                                                                                                                                           
begin []
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
OUTER: re we in a transaction here? true 1
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
OUTER: re we in a transaction here? true 2
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
OUTER: re we in a transaction here? true 5
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
OUTER: re we in a transaction here? true 6
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
OUTER: re we in a transaction here? true 9
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
OUTER: re we in a transaction here? true 10
OUTER: re we in a transaction here? true 13
OUTER: re we in a transaction here? true 14
OUTER: re we in a transaction here? true 3
OUTER: re we in a transaction here? true 4
OUTER: re we in a transaction here? true 7
OUTER: re we in a transaction here? true 8
OUTER: re we in a transaction here? true 11
OUTER: re we in a transaction here? true 12
OUTER: re we in a transaction here? true 15
OUTER: re we in a transaction here? true 16
OUTER: re we in a transaction here? true 17
OUTER: re we in a transaction here? true 18
OUTER: re we in a transaction here? true 19
OUTER: re we in a transaction here? true 20
OUTER: re we in a transaction here? true 21
OUTER: re we in a transaction here? true 22
OUTER: re we in a transaction here? true 23
OUTER: re we in a transaction here? true 24
OUTER: re we in a transaction here? true 25
OUTER: re we in a transaction here? true 26
OUTER: re we in a transaction here? true 27
OUTER: re we in a transaction here? true 28
OUTER: re we in a transaction here? true 29
OUTER: re we in a transaction here? true 30

12:30:41.708 [debug] QUERY OK db=0.6ms
commit []
{:ok, :ok}

To me, this is rather nicer than setting up a process manually.

iex(17)> ExtoTest.multi_proc_transactions

12:30:41.696 [debug] QUERY OK db=0.7ms queue=0.1ms
begin []
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
OUTER: re we in a transaction here? true 1
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
OUTER: re we in a transaction here? true 2
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
OUTER: re we in a transaction here? true 5
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
OUTER: re we in a transaction here? true 6
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
OUTER: re we in a transaction here? true 9
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
INNER: Are we in a transaction here? false
OUTER: re we in a transaction here? true 10
OUTER: re we in a transaction here? true 13
OUTER: re we in a transaction here? true 14
OUTER: re we in a transaction here? true 3
OUTER: re we in a transaction here? true 4
OUTER: re we in a transaction here? true 7
OUTER: re we in a transaction here? true 8
OUTER: re we in a transaction here? true 11
OUTER: re we in a transaction here? true 12
OUTER: re we in a transaction here? true 15
OUTER: re we in a transaction here? true 16
OUTER: re we in a transaction here? true 17
OUTER: re we in a transaction here? true 18
OUTER: re we in a transaction here? true 19
OUTER: re we in a transaction here? true 20
OUTER: re we in a transaction here? true 21
OUTER: re we in a transaction here? true 22
OUTER: re we in a transaction here? true 23
OUTER: re we in a transaction here? true 24
OUTER: re we in a transaction here? true 25
OUTER: re we in a transaction here? true 26
OUTER: re we in a transaction here? true 27
OUTER: re we in a transaction here? true 28
OUTER: re we in a transaction here? true 29
OUTER: re we in a transaction here? true 30

12:30:41.708 [debug] QUERY OK db=0.6ms
commit []
{:ok, :ok}

That said … if you do stick with your example as above, I would not use Task but create a GenServer module with a handle_info in there. It is, at least imho, cleaner / easier to read than scattering receive blocks in functions that are then passed around via “arbitrary” process spawnings (such as Task)

1 Like

Is there any chance to make it commited, lets say on each 10 records ?

1 Like

@bcahya You’re right, my code was in no way a general-purpose solution to the problem I posed, and indeed I have since modified it to add a little more error handling, and I imagine some more will be needed after testing. It’s a pretty narrow-purpose piece of code, but I did want to start a discussion around the larger problem.

@aseigo nice, I didn’t think of using streams that way. Indeed your solution means we can compute the operations in other processes and then serialise them in the current process running the transaction with Stream.each. The problem I have there is that my current solution doesn’t map clearly to that model—I need to do database access during the pre-processing of the data, because there are certain fields (call them categories) which are denormalised in the source data but which I am storing normalised (so I need to check if the category already exists, and if so, fetch its ID for the association).

Regarding your point about Tasks/GenServer—if it were possible to achieve this with GenServer, I would only need one GenServer in the first place. This was exactly what I was complaining about. In this case you have to write your own receive loop somehow, because the receive loop itself must run inside the transaction lambda. With GenServer, when we call GenServer.start_link, a new process is spawned, running the standard GenServer receive loop, and user code is only called on receipt of a message. We would need a way to “wrap” the GenServer loop in the transaction for this to work correctly.

As I pointed out, this could be solved cleanly by allowing decoupled access to transaction state—so that in init/1 we can put the connection of the current process into a transaction state, and then later on, either while handling messages or when terminating, manually commit or rollback the transaction, and perhaps start a new one.

I did take a look at the Ecto code which handles this, and it seems that the pattern of transaction operations being lambdas is baked in all the way down to the adapter code (Multi actually gets run within a lambda through the same process) so I’m not sure how feasible it would be to implement this.

Perhaps @josevalim or another core member could comment? Was there just a decision made to make the interface to transactions a lambda, or is there a fundamental reason why we cannot decouple the begin and commit/rollback? (Read: is it a case of writing a PR which provides a different interface to the underlying connection state management, or would it require a fundamental shift in some assumptions?)

1 Like

Try with Stream.scan instead of Stream.each; that should give you a counter (using a number for the accumulator) that can be reset every N iterations, and which would trigger the creation of a new transaction and the closing the existing one (if any)

1 Like

If this requires access to data being set within the transaction, then indeed the code can not just wait until the final query to commit. If those reads can be done outside the transaction, then it’s fine as-is; however I assume that if one entry causes the creation of new categories that subsequent entries should re-use those created categories.

That implies another point of serialization through the transaction. Perhaps sth like:

enumerable_contaning_entries
|> Flow.from_enumerable()
|> Flow.each(&determine_normalizations/1)
|> Stream.each(&fulfill_normalizations/1)
|> Flow.from_enumerable() # not 100% sure that would be needed, but I think it is?
|> Flow.each(&complete_normalization/1)
|> Stream.each(&ExtoTest.proc_query/1)
|> Stream.run

IOW: break it into a multi-stage process where the datafields which should be normalized are first identified and that gets passed into a serialized Stream which does the DB interaction within the transaction. You would probably be passing tuples at this point such as { parsed_data, normalizations_needed } to keep your parsing sensible.

Caching the results of queries in the “fulfil_normalizations” stage could probably also be added to prevent repeated queries for the same data by using sth like Stream.transform and a map as the accumulator to store the cached returns.

It would work exactly as your code stands now, except instead of creating a Task and a separate receive loop, the code would instead start a GenServer and set the pid contained in PFServer.TransactionManager’s state to that. That would allow you to replace the hand-coded receive with handle_* calls in the GenServer, with the :finish call result in a return of {:stop, :normal, … } to stop the GenServer (and close the transaction on its way out). You still need one GenServer process per transaction you have going on, but at least you don’t have to write a receive loop by hand (with all the possible edge conditions that may be involved).

So it doesn’t solve your main problem, but it is a slightly nicer way to write your example.

1 Like

Doing the synchronisation in multiple stages like this would require at least two passes on each entry of the data, which I’d like to avoid, although it does seem like a neat solution worth investigating.

This won’t work. GenServer.start and GenServer.start_link spawn a separate process in which the GenServer code is run, then return. Doing

Repo.transaction fn -> GenServer.start_link(...) end

will start a transaction, start the GenServer, then immediately commit an empty transaction. All Repo accesses which are supposed to be part of the transaction must synchronously run within that lambda.

On the other hand, we cannot run Repo.transaction anywhere else in the GenServer callbacks, since we wouldn’t actually return back to handle more incoming messages until the transaction lambda returns.

What we need is to wrap the GenServer’s internal receive loop itself with the transaction. This isn’t possible with the Elixir API, but having dived into the Erlang docs, it seems that spawning a new process which, inside a transaction lambda, executes :gen_server.enter_loop may actually do exactly that. We do lose some of the advantages of using start(_link), and we have to register and link the process ourselves, but it may be the best solution—as you mentioned, trying to reïmplement the various edge-case and error-case handling within GenServer is dangerous at best.

I guess the whole thing is a side-effect of the decision in Ecto to hide the concept of connection state from the user, and instead pin connections to processes. At the same time, especially with things like transactions, connection state is important, and making it magic leads to things like this. Perhaps a way in Ecto to explicitly obtain and use a particular connection from the pool would be advisable?

I’ll likely implement the GenServer wrapped-loop solution, think on this some more and perhaps start an issue in Ecto to discuss the underlying architecture more.

1 Like

While taking a look at Stream.scan, I have no idea on these trigger meaning about, since creating a new transaction inside existing transaction should do nothing.

Could you please write us a snippet code as well?

1 Like

Hm … I had expected something similar to possible from Elixir (I have more experience with Erlang than Elixir, and sometimes this trips me up … :/) … that is unfortunate. (and a reminder to always try the code out locally before commenting here … teaches me for attempting quick comments during breaks at the office :P)

Seems that way… the alternatives don’t seem great, either, though: having a transaction identifier that gets passed around (and easily lost, and not easy to detect when nesting …)? a transaction represented by a “magic” process? Nothing particularly pretty pops out at me … though it would be nice to have what you are doing included in Ecto as a nice-to-have,as yo note:

That would be awesome :slight_smile: thanks in advance for your efforts!

The transaction would move from wrapping the flow to being in the final Stream.each lambda. That function would add the queries (or lambdas) in a list (which would be the accumulator), and then whenever there were 10 (or whatever # of) queries in that list, start a transaction that runs the queries (or lambdas, as they would have captured their variables) and empty the accumulator for the next run.

1 Like

Just use two schemas or dbs to switch between :slight_smile:

We mentioned this above, this would indeed be possible but requires extra logic to switch between the two and roll back changes on any failures in the import, with a transaction we simply need to rollback if any errors occur and we’re back to a good state. It’s definitely one possible solution though!

Great, finally we solved our issue using Stream.chunk, following with Stream.each. Bind this list with transaction

TL;DR the short answer is no, the long answer is yes

There is not any concurrency when doing database calls, they block and are applied in strict order. Actually the calling process accesses the database connection socket directly (and not via a connection process). This is very efficient as it minimises copying and message passing, and garbage collection from one transaction does not effect other callers.

Transactions combined with pooling is a difficult problem. We need to ensure that a lock is held on a database connection while the transaction takes place and only the desired process(es) access that database connection, begin is always run first, commit/rollback is always called last, and the database connection is released eventually. We can provide these guarantees if the transaction occurs inside a single function call. Even in this situation the error handling is non-trivial. I think it is unrealistic to expect most users to handle this correctly and I would not be confident to handle this myself.

This means that only one process can access a process from the pool at a time and a transaction must be run side a single function call.

This approach is very difficult because :gen_server.enter_loop never returns and if the GenServer hibernates the call stack is thrown away - so the transaction would never be committed. Of course it is possible to wrap the enter_loop in a try and catch a :normal exit, just make sure never to hibernate.

As explained above I don’t think it is advisable to provide this. However you may have noticed that Ecto.Adapters.SQL.Sandbox provides a similar mechanism. A lock is acquired on a connection and this process (and possibly others) can access it many times. Of course we wouldn’t want the transaction/savepoint semantics of the sandbox. Fortunately this pool is powered at the low level by the DBConnection.Ownership pool, which does not apply the transaction/savepoint semantics. It would be possible to either use the ownership based pool, or build on it in a similar way to the sandbox does to provide guarantees on the transaction. I think the later would be preferable because it makes it easier to guarantee the transaction semantics. This could be achieved by copying the sandbox implementation and altering the ownership check in slightly such that there are 2 different ownership checkins, commit and rollback.

We would need to resolve this in DBConnection before Ecto. Pooling is separate from transaction handling there. Hopefully something related will appear before my Elixir Conf EU talk ;).

3 Likes

Thanks for a thorough and complete answer! It makes the situation much more clear.

This seems like the right solution for now, then. I’ll wrap this in a single module rather than making this extensible in any way to ensure that the GenServer does actually exit at some point, and that it does not hibernate (along with plenty of comments to ensure it stays that way in the future).

I see, seems like this thing runs all the way down. It may be worth reconsidering, but I think there should probably be other use-cases than just this, especially since it could be implemented with the solution above.

Thanks again!

There is already work in progress on an alternative API for long running concurrent workflows that plays nicely with existing libraries.

1 Like