I’m working on a proof of concept using Commanded within a umbrella Phoenix application. I’ve set up an aggregate to manage accounts and a projector (VPNUsage.Projectors.AccountBalance) to create corresponding projections.
Problem:
After starting the Phoenix server (mix phx.server), I encounter a {:error, :consistency_timeout} when running the following code from an external console (iex -S mix):
import Ecto.Query
ids_to_ignore = Hustle.VPNUsage.Projections.AccountBalance |> Hustle.Repo.all |> Enum.map(&(&1.amalgama_account_uuid))
q = from t in Comm.TwilioPhoneNumber,
where: not(t.id in ^ids_to_ignore),
preload: :context
result = Comm.Repo.all(q)
Enum.map(result, fn %{context: %{amalgama_acc_id: amalgama_account_id}} ->
parms = %{amalgama_account_uuid: amalgama_account_id, initial_balance: 0, overdrawn_credits: 25}
# Inside `create_account`, CommandedApp.dispatch(create_account_cmd, consistency: :strong) is used
Hustle.VPNUsage.create_account(params)
end)
I’m encountering {:error, :consistency_timeout} when the Phoenix server is started (mix phx.server). However, the code runs without errors if the Phoenix server is not running.
Suspect issue might be related to the projector (VPNUsage.Projectors.AccountBalance) attempting multiple subscription:
14:06:34.716 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:07:34.967 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:08:35.220 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:09:35.471 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:10:35.724 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:11:35.978 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:12:36.239 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:13:36.488 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:14:36.727 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:15:36.992 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
14:16:37.231 [debug] Subscription "VPNUsage.Projectors.AccountBalance"@"$all" subscribe to stream
Can you please help me to understand what I’m doing wrong?
Thanks in advance!
After some troubleshooting, I want to update the question:
Description:
I’m working on a proof of concept using Commanded within an umbrella Phoenix application. I’ve set up an aggregate to manage accounts and a projector (VPNUsage.Projectors.AccountBalance) to create corresponding projections.
Problem:
After starting the Phoenix server on my local docker-compose environment (iex -S mix phx.server), I encounter a {:error, :consistency_timeout} when running the following code inside an external console (iex -S mix):
import Ecto.Query
ids_to_ignore = Hustle.VPNUsage.Projections.AccountBalance |> Hustle.Repo.all |> Enum.map(&(&1.amalgama_account_uuid))
q = from t in Comm.TwilioPhoneNumber,
join: c in assoc(t, :context),
limit: 1,
where: not(c.amalgama_acc_id in ^ids_to_ignore),
preload: [context: c]
result = Comm.Repo.all(q)
Enum.map(result, fn %{context: %{amalgama_acc_id: amalgama_account_id}} ->
Hustle.VPNUsage.create_account(%{amalgama_account_uuid: amalgama_account_id, initial_balance: 0, overdrawn_credits: 25})
end)
I’m encountering {:error, :consistency_timeout} when the Phoenix server is started (iex -S mix phx.server). However, the code runs without errors if the Phoenix server is not running.
Additionally, the code works if run inside the Phoenix server (through a controller), but if I run the same code in a different console (using iex -S mix), I get {:error, :consistency_timeout}. Despite this, I can see the projection being created.
Could you please help me to understand what I’m doing wrong?
Thanks in advance!
Not sure about commanded issue, but this one could be definitely improved. Take a look at the subquery API as using it you could optimise ids_to_ignore at a database level.
(…) as a where condition:
subset_ids = from(p in subset, select: p.id)
Repo.update_all(
from(p in Post, where: p.id in subquery(subset_ids)),
set: [sync_started_at: NaiveDateTime.utc_now()]
)
Receiving an `{:error, :consistency_timeout}` error indicates the command successfully dispatched, but some or all of the strongly consistent event handlers have not yet executed.
This thing of “the event handler has executed” uses message passing, e.g. Process.send to notify the dispatcher (the code you run in the second shell) as well as other subscribers, if there are any.
I suspect, but I do not know for certain, that the problem you are seeing is happening because your 2nd shell does not get any messages back.
Did you try running it in distributed mode yet?
You can do this even with docker-compose.
I am guessing in your dockerfile or maybe in some entrypoint.sh script you have a line like this:
Side-note:
You are using Repo.all with limit(1) in the query, and then Enum.map.
If you really just want 1 entry, use Repo.one, keep the limit and remove the Enum.map.
But I am guessing you are just doing that for debugging? To only do 1 thing, for simplicity/clarity?
Thanks @slouchpie for your reply. I tried the distributed mode, but I didn’t have any luck. I keep getting the same error. I did more tests and it looks like whether I run the server or the console, I can’t have more than one running simultaneously because it fails with {:error, :consistency_timeout}.