We’ve got a kind of producer/consumer workflow happening, involving three processes.
The main process inserts a whole bunch of records into the progress
table in a single atomic bulk insert. (This ensures that our entire roster of work gets set up at once without interruption.) Each progress
row has a status of pending
.
The Creator
process is a GenServer that (basically) does the following on an interval:
def create_stuff(opts) do
# Grab `batch_size` waiting entries
batch = from(p in Progress, where: p.status == "pending", limit: opts[:batch_size], lock: "FOR UPDATE NOWAIT") |> Repo.all()
# Set their status to "processing"
batch |> Repo.update_all(set: [status: "processing", updated_at: now])
# Process them and mark them completed all in a transaction
batch
|> Enum.each(fn entry ->
Repo.transaction(fn ->
Logger.info("Creating #{entry.name}")
do_something_with(entry)
Progress.changeset(entry, %{status: "complete"}
end)
end)
end
The Redriver
process is a GenServer that sweeps the progress
table and resets any entry that’s been processing
longer than @timeout
seconds (and is therefore assumed to be part of a stuck batch).
timeout = DateTime.utc_now() |> DateTime.add(-@timeout, :second)
from(p in Progress, where: p.status == "processing" and p.updated_at <= ^timeout)
|> Repo.update_all(set: [status: "pending", updated_at: DateTime.utc_now()])
The idea is that no two Creator
processes should ever select the same set of progress
rows to work on, but of course there’s that tiny window between batch =
and update_all
where another process could grab the same rows. Hence the lock.
This seems to be working in practice, in that we’ve observed that work gets duplicated without the lock and doesn’t seem to with the lock. But we’ve had trouble devising a test for it.
The latest attempt is:
test "concurrency" do
Sandbox.mode(Repo, {:shared, self()})
1..5
|> Enum.each(fn index ->
Progress.changeset(%Progress{}, %{name: "TEST_ENTRY_#{index}"}) |> Repo.insert()
)
log =
capture_log(fn ->
Enum.each(1..5, fn _ ->
spawn(fn -> Creator.create_stuff(batch_size: 2) end)
end)
:timer.sleep(1000)
assert from(p in Progress, where: p.status == "complete") |> Repo.aggregate(:count) == 5
assert assert Regex.scan(~r/Creating TEST_ENTRY_1/, log) |> length() == 1
end
The above test fails, because "Creating TEST_ENTRY_1"
appears in the log 5 times – once for each spawned call to Creator.create_stuff/1
.
Now that I’ve got all of that out of the way, here are my questions:
- Is this the right way to use
FOR UPDATE NOWAIT
locking? - Is is possible that it’s working in reality but the Ecto Sandbox is getting in the way testing it properly?
- Is there a better way to accomplish what we’re trying to do here?