Sanity Check: Right way to use Ecto/Postgres locks (and how to test them)

We’ve got a kind of producer/consumer workflow happening, involving three processes.

The main process inserts a whole bunch of records into the progress table in a single atomic bulk insert. (This ensures that our entire roster of work gets set up at once without interruption.) Each progress row has a status of pending.

The Creator process is a GenServer that (basically) does the following on an interval:

def create_stuff(opts) do
  # Grab `batch_size` waiting entries
  batch = from(p in Progress, where: p.status == "pending", limit: opts[:batch_size], lock: "FOR UPDATE NOWAIT") |> Repo.all()
  # Set their status to "processing"
  batch |> Repo.update_all(set: [status: "processing", updated_at: now])
  # Process them and mark them completed all in a transaction
  batch
  |> Enum.each(fn entry -> 
    Repo.transaction(fn ->
      Logger.info("Creating #{entry.name}")
      do_something_with(entry)
      Progress.changeset(entry, %{status: "complete"}
    end)
  end)
end

The Redriver process is a GenServer that sweeps the progress table and resets any entry that’s been processing longer than @timeout seconds (and is therefore assumed to be part of a stuck batch).

timeout = DateTime.utc_now() |> DateTime.add(-@timeout, :second)
from(p in Progress, where: p.status == "processing" and p.updated_at <= ^timeout)
|> Repo.update_all(set: [status: "pending", updated_at: DateTime.utc_now()])

The idea is that no two Creator processes should ever select the same set of progress rows to work on, but of course there’s that tiny window between batch = and update_all where another process could grab the same rows. Hence the lock.

This seems to be working in practice, in that we’ve observed that work gets duplicated without the lock and doesn’t seem to with the lock. But we’ve had trouble devising a test for it.

The latest attempt is:

test "concurrency" do
  Sandbox.mode(Repo, {:shared, self()})
  
  1..5 
  |> Enum.each(fn index ->
    Progress.changeset(%Progress{}, %{name: "TEST_ENTRY_#{index}"}) |> Repo.insert()
  )

  log =
    capture_log(fn ->
      Enum.each(1..5, fn _ ->
        spawn(fn -> Creator.create_stuff(batch_size: 2) end)
      end)

  :timer.sleep(1000)

  assert from(p in Progress, where: p.status == "complete") |> Repo.aggregate(:count) == 5
  assert assert Regex.scan(~r/Creating TEST_ENTRY_1/, log) |> length() == 1
end

The above test fails, because "Creating TEST_ENTRY_1" appears in the log 5 times – once for each spawned call to Creator.create_stuff/1.

Now that I’ve got all of that out of the way, here are my questions:

  1. Is this the right way to use FOR UPDATE NOWAIT locking?
  2. Is is possible that it’s working in reality but the Ecto Sandbox is getting in the way testing it properly?
  3. Is there a better way to accomplish what we’re trying to do here?

I recognize that this seems like a great use case for Broadway or at least a GenStage, and we actually are using Broadway farther down the pipe from here. But it’s essential that the initial piece – establishing the work to be done – is handled atomically. For example, if we tried to set this up using SQS messages for the entries and Broadway to process them, we’d have to contend with what happens if our server dies halfway through sending the messages to SQS. We need to be able to say “Here are 25,000 things that need to be done” and know that of those 25,000, the number successfully staged for processing is either 25,000 or 0.

I would expect FOR UPDATE SKIP LOCKED here.

The docs say:

With NOWAIT , the statement reports an error, rather than waiting, if a selected row cannot be locked immediately. With SKIP LOCKED , any selected rows that cannot be immediately locked are skipped.

Yes, I think sandbox is sharing a single transaction among all the spawned processes, instead of allowing separate connections/transactions for each one.

1 Like

Thank you, @mbuhot! I think SKIP LOCKED is exactly what I was looking for! I’ve also got the tests passing, but I’m not entirely sure why given how the sandbox works. :expressionless: