The same db transaction on the same dataset works well with Repo.transaction(), but results in this error if done using Ash.Reactor. What could I be missing? Using Ash 3.4.
Postgrex.Protocol (#PID<0.2666.0>) disconnected: ** (DBConnection.ConnectionError) client #PID<0.4466.0> timed out because it queued and checked out the connection for longer than 15000ms
The following works with no issues.
Repo.transaction(fn ->
existing_list
|> Ash.bulk_destroy(:destroy, %{}, actor: super_user)
new_list
|> Ash.bulk_create(DateInstance, :create,
upsert?: true,
return_notifications?: true,
upsert_fields: [
....
]
)
end)
The following errors out after several rows of SQL statements are logged. It sometimes happens during the Delete queries, and other times during the Insert queries.
defmodule MyApp.ChangeDateReactor do
@moduledoc false
use Ash.Reactor
alias MyApp.DateInstance
ash do
default_domain MyApp
end
input(:id)
input(:date)
transaction :delete_and_insert, DateInstance do
step :change_calc, MyApp.ChangeCalcStep do
argument :id, input(:id)
argument :date, input(:date)
end
step :delete_unnecessary_date_instances,
MyApp.DeleteUnnecessaryDateInstancesStep do
argument :calc_output, result(:change_calc)
end
step :upsert_updated_date_instances, MyApp.UpsertUpdatedDateInstancesStep do
argument :calc_output, result(:change_calc)
end
end
return(:delete_and_insert)
end
defmodule MyApp.ChangeCalcStep do
@moduledoc false
use Reactor.Step
alias MyApp.User
alias MyApp.DateInstance
@impl true
def run(arguments, _context, _options) do
calc_output =
change_calc(
arguments.id,
arguments.date
)
{:ok, calc_output}
end
@impl true
def compensate(%{code: :network_error}, _arguments, context, options) do
:retry
end
def compensate(error, _arguments, context, options) do
:ok
end
@impl true
def undo(subscription, _arguments, context, options) do
:ok
end
end
defmodule MyApp.DeleteUnnecessaryDateInstancesStep do
@moduledoc false
use Reactor.Step
alias MyApp.User
alias MyApp.DateInstance
require Ash.Query
require Untangle, as: Ulog
@impl true
def run(arguments, _context, options) do
super_user = User.get_super_user!()
%{existing_list: existing_list} =
arguments.calc_output
existing_list
|> Ash.bulk_destroy(:destroy, %{}, actor: super_user)
|> case do
%Ash.BulkResult{
status: :success
} ->
{:ok, "DateInstance cleaned up successfully."}
%Ash.BulkResult{
status: :error,
errors: error
} ->
{:error, error}
end
end
@impl true
def compensate(%{code: :network_error}, _arguments, context, options) do
:retry
end
def compensate(error, _arguments, context, options) do
:ok
end
@impl true
def undo(subscription, _arguments, context, options) do
:ok
end
end
defmodule MyApp.UpsertUpdatedDateInstancesStep do
@moduledoc false
use Reactor.Step
alias MyApp.DateInstance
require Ash.Query
@impl true
def run(arguments, context, options) do
%{
new_list: new_list
} =
arguments.calc_output
new_list
|> Ash.bulk_create(DateInstance, :create,
upsert?: true,
return_notifications?: true,
upsert_fields: [
...
]
)
|> case do
%Ash.BulkResult{
status: :success
} ->
{:ok, "DateInstance created successfully."}
%Ash.BulkResult{
status: :error,
errors: error
} ->
{:error, error}
end
end
@impl true
def compensate(%{code: :network_error}, arguments, context, options) do
:retry
end
def compensate(error, arguments, context, options) do
:ok
end
@impl true
def undo(subscription, arguments, context, options) do
:ok
end
end