Basic bulk actions, atomics, new stream options, `error/2` expression

Hey everyone! Work is progressing nicely on bulk updates & destroys, which are the primary missing features before I switch to focus on DX/docs/tooling, and the 3.0 release! Some useful additions have been added along these lines, and I’d like to highlight them here :slight_smile:

Basic bulk actions

Bulk actions in Ash consist of choosing the “best available” way to perform a bulk operation, based on the action and data layer involved. The basic function signature looks like this:

# provide a query, and each query result will be updated with the provided input
YourApi.bulk_update(query, :action, %{...})

# or provide a list (or stream) of records to be updated
YourApi.bulk_update([record1, record2, record3], :action, %{...})

We will look at the action/resource, and do one of the following things:

Atomic Bulk Operation

This is part of the work I’m doing that is still pending.

If the action can be made to be fully atomic (see the second on atomics below), and the data layer supports
atomic bulk operations, we will tell the data layer “update this query with these atomic expressions”. This means you will get a query like the following in your data layer:

UPDATE 
foo
SET field = value
WHERE <would be returned by the provided query>

This is generally considered the most optimal behavior that we can choose.

Streaming individual update

If the data layer does not support atomic bulk operations, we will stream the query and/or records provided, and update them in batches. Each batch will use various batch optimizations that exist today (each change will use its batch callbacks if defined), and then each record will be updated one at a time.

Streaming batch updates

This is an optimization to be added later

If the data layer supports non-atomic bulk updates, We will do the same as the above, but then we will issue a batch update.

Atomics

Atomics are available today, but only explicitly. For example, you can write an action like so:

update :bump_score do
  change atomic_update(:score, expr(score + 1))
end

We are adding a suite of tools that will allow us to derive atomic implementations for many actions, and will allow users to enhance their existing functionality to enhance their existing functionality with atomic behavior as well. For example, you can implement the atomic/3 callback in a change. For example, lets say you had a change like this:

defmodule IncrementScore do
  use Ash.Resource.Change

  def change(changeset, _, _) do
     Ash.Changeset.change_attribute(changeset, :score, changeset.data.score + 1)
  end
end

This change is problematic in the context of concurrency. If you have two processes updating this same record at the same time, you will get inconsistent results. To solve for this, you have some options that don’t include atomics. For example:

update :increment_score do
  change get_and_lock(:for_update)
  change IncrementScore
end

This will lock the record in the database. Locking is one way to solve this issue. Alternatively, you could have a GenServer in charge of serializing operations against a given resource (outside of Ash). Perhaps this resource lives in a data layer that doesn’t support locking, like an external api or a CSV. Of course our advice would always be to use the best data layer possible, it is not always possible to use postgres with everything :slight_smile:

Using the new atomic tooling, you could enhance your change like so:

defmodule IncrementScore do
  use Ash.Resource.Change
  require Ash.Expr

  def change(changeset, _, _) do
     Ash.Changeset.change_attribute(changeset, :score, changeset.data.score + 1)
  end

  def atomic(changeset, _, _) do
     # get a reference to either the current value, or the latest expression being used to update score
     score_ref = Ash.Changeset.atomic_ref(changeset, :score)
     {:atomic, %{score: Ash.Expr.expr(^score_ref + 1)}}
  end
end

Now, that same change will be performed with the atomic operations if possible. The use of score_ref ensures that the following does what it looks like it should do:

update :increment_score_twice do
  change Increment
  change Increment
end

The above would end up with an atomic like SET score = (score + 1) + 1.

While all of these callbacks are now defined, nothing will actually use them yet., we will first check if an action can be done fully atomically (all changes and validations that may touch a given field have an atomic callback). If it can’t, we will check to see if the action has require_atomic? set to true or if each change/validation missing the atomic callback has require_atomic? set to false. If so, the action will result in an error to inform you that you are performing a potentially unsafe-for-concurrency operation.

In 2.0 require_atomic? will default to false, for backwards compatibility. However, in 3.0, you will need to be explicit to write any action that may be unsafe to do concurrently.

Some validations or changes may not depend on the previous values (accessed via changeset.data). In that case, you can use require_atomic?: false on the change or validation

New Api.stream options

Previously, you could only stream an action that had keyset pagination enabled. We now allow you to stream any action, but you must explicitly allow us to choose an option other than keyset pagination. This aligns with the general pattern we have adopted (and will result in a few breaking changes in 3.0) which is to only allow the most optimal behavior, and require explicitly opting into any “adaptive” (i.e supporting data layers/actions that can do what you want, but will not be as performant as they would be otherwise) logic that Ash may perform.

YourApi.stream!(query, allow_stream_with: :full_read)

These options are documented in Ash.Api.stream! .

error/2 expression

The error/2 expression underpins the ability to run changes and validations atomically. For example, here is the atomic implementation of the attribute_equals/2 builtin change.

  def atomic(changeset, opts) do
    field_value = Ash.Changeset.atomic_ref(changeset, opts[:attribute])

    {:atomic, [opts[:attribute]], Ash.Expr.expr(^field_value != ^opts[:value]),
     Ash.Expr.expr(
       error(^InvalidAttribute, %{
         field: ^opts[:attribute],
         value: ^field_value,
         message: "must equal %{value}",
         vars: %{field: ^opts[:attribute], value: ^opts[:value]}
       })
     )}
  end

Only data layers that support error/2 will be able to support fully atomic actions. AshPostgres supports this by opening save points when necessary. These save points are short-lived, and should not incur the significant costs that can be incurred when using save points for long running transactions.

Final Notes

These changes represent a lot of what is going into 3.0. We expect upgrading to 3.0 to be doable in a few hours, or at most a day, and to primarily consist of making some monotonous changes (i.e going to each of your update actions with a functional change and adding require_atomic? false). Not all of the above is available, but it will be soon. We aren’t gating anything behind 3.0, except for the breaking changes involved in switching the default behavior around.

I’m sure I left things out, so please respond with any questions or concerns you may have. Thanks for coming to my TED talk :bowing_man:

16 Likes

An update on this:

Still have lots of work to do on it, but the initial work for data-layer-backed bulk updates and bulk destroys has landed in main of ash and ash_postgres. Its going to be ~1-3 weeks before it gets a proper release/is properly ready for use, but the basic shape of the operation is there, and it will be used in simpler cases (not all actions can be done atomically, and therefore cannot be used with bulk operations).

What remains to be done before the switch to DX, docs and 3.0 breaking changes begins:

  1. All of the builtin changes and atomics need to have an atomic callback added to them so they can be run atomically.
  2. Adding options to allow you to require that the operation be done using the data layer update_query or destroy_query callback. Add this option at the action level as well, so you can ensure that certain actions are not accidentally modified to become not atomic. In 3.0 this will be switched, and you will have to explicitly allow update actions to not be atomic (probably).
  3. Write a guide on making actions atomic and bulk operations.
  4. add atomic_change and atomic_validate callbacks for inline changes/validations that are atomic (maybe)
  5. Add a batched version of bulk updates/destroys that will use limit/offset or keyset pagination to grab primary keys from the query and use those as a filter, iteratively. This strategy would be chosen when there are after_action hooks that have to be run, to avoid reading back massive amounts of data into the application.
  6. Add bulk callbacks for all builtin changes, so they are optimized for bulk operation (which may be required for the above item, but also is important just for performance with bulk_create, which always runs batches.
7 Likes