Hey everyone! Work is progressing nicely on bulk updates & destroys, which are the primary missing features before I switch to focus on DX/docs/tooling, and the 3.0 release! Some useful additions have been added along these lines, and I’d like to highlight them here
Basic bulk actions
Bulk actions in Ash consist of choosing the “best available” way to perform a bulk operation, based on the action and data layer involved. The basic function signature looks like this:
# provide a query, and each query result will be updated with the provided input
YourApi.bulk_update(query, :action, %{...})
# or provide a list (or stream) of records to be updated
YourApi.bulk_update([record1, record2, record3], :action, %{...})
We will look at the action/resource, and do one of the following things:
Atomic Bulk Operation
This is part of the work I’m doing that is still pending.
If the action can be made to be fully atomic (see the second on atomics below), and the data layer supports
atomic bulk operations, we will tell the data layer “update this query with these atomic expressions”. This means you will get a query like the following in your data layer:
UPDATE
foo
SET field = value
WHERE <would be returned by the provided query>
This is generally considered the most optimal behavior that we can choose.
Streaming individual update
If the data layer does not support atomic bulk operations, we will stream the query and/or records provided, and update them in batches. Each batch will use various batch optimizations that exist today (each change will use its batch callbacks if defined), and then each record will be updated one at a time.
Streaming batch updates
This is an optimization to be added later
If the data layer supports non-atomic bulk updates, We will do the same as the above, but then we will issue a batch update.
Atomics
Atomics are available today, but only explicitly. For example, you can write an action like so:
update :bump_score do
change atomic_update(:score, expr(score + 1))
end
We are adding a suite of tools that will allow us to derive atomic implementations for many actions, and will allow users to enhance their existing functionality to enhance their existing functionality with atomic behavior as well. For example, you can implement the atomic/3
callback in a change. For example, lets say you had a change like this:
defmodule IncrementScore do
use Ash.Resource.Change
def change(changeset, _, _) do
Ash.Changeset.change_attribute(changeset, :score, changeset.data.score + 1)
end
end
This change is problematic in the context of concurrency. If you have two processes updating this same record at the same time, you will get inconsistent results. To solve for this, you have some options that don’t include atomics. For example:
update :increment_score do
change get_and_lock(:for_update)
change IncrementScore
end
This will lock the record in the database. Locking is one way to solve this issue. Alternatively, you could have a GenServer in charge of serializing operations against a given resource (outside of Ash). Perhaps this resource lives in a data layer that doesn’t support locking, like an external api or a CSV. Of course our advice would always be to use the best data layer possible, it is not always possible to use postgres with everything
Using the new atomic tooling, you could enhance your change like so:
defmodule IncrementScore do
use Ash.Resource.Change
require Ash.Expr
def change(changeset, _, _) do
Ash.Changeset.change_attribute(changeset, :score, changeset.data.score + 1)
end
def atomic(changeset, _, _) do
# get a reference to either the current value, or the latest expression being used to update score
score_ref = Ash.Changeset.atomic_ref(changeset, :score)
{:atomic, %{score: Ash.Expr.expr(^score_ref + 1)}}
end
end
Now, that same change will be performed with the atomic operations if possible. The use of score_ref
ensures that the following does what it looks like it should do:
update :increment_score_twice do
change Increment
change Increment
end
The above would end up with an atomic like SET score = (score + 1) + 1
.
While all of these callbacks are now defined, nothing will actually use them yet., we will first check if an action can be done fully atomically (all changes and validations that may touch a given field have an atomic callback). If it can’t, we will check to see if the action has require_atomic?
set to true
or if each change/validation missing the atomic callback has require_atomic?
set to false
. If so, the action will result in an error to inform you that you are performing a potentially unsafe-for-concurrency operation.
In 2.0 require_atomic?
will default to false
, for backwards compatibility. However, in 3.0, you will need to be explicit to write any action that may be unsafe to do concurrently.
Some validations or changes may not depend on the previous values (accessed via changeset.data
). In that case, you can use require_atomic?: false
on the change or validation
New Api.stream options
Previously, you could only stream an action that had keyset pagination enabled. We now allow you to stream any action, but you must explicitly allow us to choose an option other than keyset pagination. This aligns with the general pattern we have adopted (and will result in a few breaking changes in 3.0) which is to only allow the most optimal behavior, and require explicitly opting into any “adaptive” (i.e supporting data layers/actions that can do what you want, but will not be as performant as they would be otherwise) logic that Ash may perform.
YourApi.stream!(query, allow_stream_with: :full_read)
These options are documented in Ash.Api.stream!
.
error/2 expression
The error/2 expression underpins the ability to run changes and validations atomically. For example, here is the atomic implementation of the attribute_equals/2
builtin change.
def atomic(changeset, opts) do
field_value = Ash.Changeset.atomic_ref(changeset, opts[:attribute])
{:atomic, [opts[:attribute]], Ash.Expr.expr(^field_value != ^opts[:value]),
Ash.Expr.expr(
error(^InvalidAttribute, %{
field: ^opts[:attribute],
value: ^field_value,
message: "must equal %{value}",
vars: %{field: ^opts[:attribute], value: ^opts[:value]}
})
)}
end
Only data layers that support error/2
will be able to support fully atomic actions. AshPostgres supports this by opening save points when necessary. These save points are short-lived, and should not incur the significant costs that can be incurred when using save points for long running transactions.
Final Notes
These changes represent a lot of what is going into 3.0. We expect upgrading to 3.0 to be doable in a few hours, or at most a day, and to primarily consist of making some monotonous changes (i.e going to each of your update actions with a functional change and adding require_atomic? false
). Not all of the above is available, but it will be soon. We aren’t gating anything behind 3.0, except for the breaking changes involved in switching the default behavior around.
I’m sure I left things out, so please respond with any questions or concerns you may have. Thanks for coming to my TED talk