Ash_Reactor: Postgrex.Protocol ( disconnected: ** (DBConnection.ConnectionError) client timed out

The same db transaction on the same dataset works well with Repo.transaction(), but results in this error if done using Ash.Reactor. What could I be missing? Using Ash 3.4.

Postgrex.Protocol (#PID<0.2666.0>) disconnected: ** (DBConnection.ConnectionError) client #PID<0.4466.0> timed out because it queued and checked out the connection for longer than 15000ms

The following works with no issues.


Repo.transaction(fn ->

      existing_list
      |> Ash.bulk_destroy(:destroy, %{}, actor: super_user)

      new_list
      |> Ash.bulk_create(DateInstance, :create,
        upsert?: true,
        return_notifications?: true,
        upsert_fields: [
          ....
        ]
      )
  end)

The following errors out after several rows of SQL statements are logged. It sometimes happens during the Delete queries, and other times during the Insert queries.


defmodule MyApp.ChangeDateReactor do
  @moduledoc false
  use Ash.Reactor

  alias MyApp.DateInstance

  ash do
    default_domain MyApp
  end

  input(:id)
  input(:date)

  transaction :delete_and_insert, DateInstance do
    step :change_calc, MyApp.ChangeCalcStep do
      argument :id, input(:id)
      argument :date, input(:date)
    end

    step :delete_unnecessary_date_instances,
         MyApp.DeleteUnnecessaryDateInstancesStep do
      argument :calc_output, result(:change_calc)
    end

    step :upsert_updated_date_instances, MyApp.UpsertUpdatedDateInstancesStep do
      argument :calc_output, result(:change_calc)
    end
  end

  return(:delete_and_insert)
end


defmodule MyApp.ChangeCalcStep do
  @moduledoc false
  use Reactor.Step

  alias MyApp.User
  alias MyApp.DateInstance

  @impl true
  def run(arguments, _context, _options) do

    calc_output =
      change_calc(
        arguments.id,
        arguments.date
      )

    {:ok, calc_output}
  end

  @impl true
  def compensate(%{code: :network_error}, _arguments, context, options) do
    :retry
  end

  def compensate(error, _arguments, context, options) do
    :ok
  end

  @impl true
  def undo(subscription, _arguments, context, options) do
    :ok
  end
end

defmodule MyApp.DeleteUnnecessaryDateInstancesStep do
  @moduledoc false
  use Reactor.Step

  alias MyApp.User
  alias MyApp.DateInstance

  require Ash.Query
  require Untangle, as: Ulog

  @impl true
  def run(arguments, _context, options) do
    super_user = User.get_super_user!()

    %{existing_list: existing_list} =
      arguments.calc_output

    existing_list
    |> Ash.bulk_destroy(:destroy, %{}, actor: super_user)
    |> case do
      %Ash.BulkResult{
        status: :success
      } ->
        {:ok, "DateInstance cleaned up successfully."}

      %Ash.BulkResult{
        status: :error,
        errors: error
      } ->
        {:error, error}
    end
  end

  @impl true
  def compensate(%{code: :network_error}, _arguments, context, options) do
    :retry
  end

  def compensate(error, _arguments, context, options) do
    :ok
  end

  @impl true
  def undo(subscription, _arguments, context, options) do
    :ok
  end
end

defmodule MyApp.UpsertUpdatedDateInstancesStep do
  @moduledoc false
  use Reactor.Step

  alias MyApp.DateInstance

  require Ash.Query

  @impl true
  def run(arguments, context, options) do
    %{
      new_list: new_list
    } =
      arguments.calc_output

    new_list
    |> Ash.bulk_create(DateInstance, :create,
      upsert?: true,
      return_notifications?: true,
      upsert_fields: [
        ...
      ]
    )
    |> case do
      %Ash.BulkResult{
        status: :success
      }  ->

        {:ok, "DateInstance created successfully."}

      %Ash.BulkResult{
        status: :error,
        errors: error
      } ->
        {:error, error}
    end
  end

  @impl true
  def compensate(%{code: :network_error}, arguments, context, options) do
    :retry
  end

  def compensate(error, arguments, context, options) do
    :ok
  end

  @impl true
  def undo(subscription, arguments, context, options) do

    :ok
  end
end

My initial read is that there is some kind of internal reactor bug here. Assuming there is no good reason for this logic to sometimes take more than 15000ms to execute.

If you could reproduce this in a test project then myself or @jimsynz will almost certainly be able to nail it down.

1 Like

Okay. I will try. Thank you!

Does it actually take longer than 15s to execute? If so you want to change the transaction timeout.

Well, I don’t think so. Repo.transaction completes the same job almost instantaneously.

Yeah, in that case a reproduction will likely be necessary.

1 Like