In Commanded, subscription to the stream is happening multiple times in a Phoenix umbrella application, causing {:error, :consistency_timeout}

Description:

I’m working on a proof of concept using Commanded within a umbrella Phoenix application. I’ve set up an aggregate to manage accounts and a projector (VPNUsage.Projectors.AccountBalance) to create corresponding projections.

Problem:

After starting the Phoenix server (mix phx.server), I encounter a {:error, :consistency_timeout} when running the following code from an external console (iex -S mix):

  import Ecto.Query
  ids_to_ignore = Hustle.VPNUsage.Projections.AccountBalance |> Hustle.Repo.all |> Enum.map(&(&1.amalgama_account_uuid))
  
  q = from t in Comm.TwilioPhoneNumber,
  where: not(t.id in ^ids_to_ignore),
  preload: :context
  
  result = Comm.Repo.all(q)
  
  Enum.map(result, fn %{context: %{amalgama_acc_id: amalgama_account_id}} ->
    parms = %{amalgama_account_uuid: amalgama_account_id, initial_balance: 0, overdrawn_credits: 25}
    # Inside `create_account`, CommandedApp.dispatch(create_account_cmd, consistency: :strong) is used
    Hustle.VPNUsage.create_account(params) 
  end)

I’m encountering {:error, :consistency_timeout} when the Phoenix server is started (mix phx.server). However, the code runs without errors if the Phoenix server is not running.

Suspect issue might be related to the projector (VPNUsage.Projectors.AccountBalance) attempting multiple subscription:

14:06:34.716 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:07:34.967 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:08:35.220 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:09:35.471 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:10:35.724 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:11:35.978 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:12:36.239 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:13:36.488 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:14:36.727 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:15:36.992 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:16:37.231 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream

Can you please help me to understand what I’m doing wrong?
Thanks in advance!

Does the problem happen if you run in distributed mode, with the nodes connected?

To do that, use this command to run the phoenix server:

iex --sname phx_node --cookie my_cookie -S mix phx.server

and this command to start your iex shell

iex --sname iex_node --cookie my_cookie -S mix
1 Like

After some troubleshooting, I want to update the question:

Description:

I’m working on a proof of concept using Commanded within an umbrella Phoenix application. I’ve set up an aggregate to manage accounts and a projector (VPNUsage.Projectors.AccountBalance) to create corresponding projections.

Problem:

After starting the Phoenix server on my local docker-compose environment (iex -S mix phx.server), I encounter a {:error, :consistency_timeout} when running the following code inside an external console (iex -S mix):

import Ecto.Query
ids_to_ignore = Hustle.VPNUsage.Projections.AccountBalance |> Hustle.Repo.all |> Enum.map(&(&1.amalgama_account_uuid))

q = from t in Comm.TwilioPhoneNumber,
  join: c in assoc(t, :context),
  limit: 1,
  where: not(c.amalgama_acc_id in ^ids_to_ignore),
  preload: [context: c]

result = Comm.Repo.all(q)

Enum.map(result, fn %{context: %{amalgama_acc_id: amalgama_account_id}} ->
  Hustle.VPNUsage.create_account(%{amalgama_account_uuid: amalgama_account_id, initial_balance: 0, overdrawn_credits: 25})
end)

I’m encountering {:error, :consistency_timeout} when the Phoenix server is started (iex -S mix phx.server). However, the code runs without errors if the Phoenix server is not running.

Additionally, the code works if run inside the Phoenix server (through a controller), but if I run the same code in a different console (using iex -S mix), I get {:error, :consistency_timeout}. Despite this, I can see the projection being created.

Could you please help me to understand what I’m doing wrong?
Thanks in advance!

Not sure about commanded issue, but this one could be definitely improved. Take a look at the subquery API as using it you could optimise ids_to_ignore at a database level.

(…) as a where condition:

subset_ids = from(p in subset, select: p.id)
Repo.update_all(
  from(p in Post, where: p.id in subquery(subset_ids)),
  set: [sync_started_at: NaiveDateTime.utc_now()]
)

Source: Ecto.Query.subquery/2

See also: Subqueries | Aggregates and subqueries @ ecto documentation

1 Like

@AugustoPedraza from the docs (guides/Commands.md):

Receiving an `{:error, :consistency_timeout}` error indicates the command successfully dispatched, but some or all of the strongly consistent event handlers have not yet executed.

This thing of “the event handler has executed” uses message passing, e.g. Process.send to notify the dispatcher (the code you run in the second shell) as well as other subscribers, if there are any.

I suspect, but I do not know for certain, that the problem you are seeing is happening because your 2nd shell does not get any messages back.

Did you try running it in distributed mode yet?

You can do this even with docker-compose.

I am guessing in your dockerfile or maybe in some entrypoint.sh script you have a line like this:

iex -S mix phx.server

Change that to

iex --sname hustle-main --cookie dev-erl-cookie -S mix phx.server

When your docker container is up and running, you can run the 2nd shell like this:

docker-compose exec hustle iex --sname hustle-attached-remsh --cookie dev-erl-cookie --remsh hustle-main

I have not tested those commands so they might need tweaking.

1 Like

Side-note:
You are using Repo.all with limit(1) in the query, and then Enum.map.
If you really just want 1 entry, use Repo.one, keep the limit and remove the Enum.map.
But I am guessing you are just doing that for debugging? To only do 1 thing, for simplicity/clarity?

2 Likes

Thanks @slouchpie for your reply. I tried the distributed mode, but I didn’t have any luck. I keep getting the same error. I did more tests and it looks like whether I run the server or the console, I can’t have more than one running simultaneously because it fails with {:error, :consistency_timeout}.

OK I hope you figure it out. I have no other ideas.