Commanded/CQRS and how to handle when user is trying same command when previous one is not handled yet

I am learning Commanded and CQRS in general with a small pet project that is essentially yet another wrapper over a ChatGPT conversation with the prompts/buttons specific to the use case. It is to look like:

  1. User: Please provide an opinion on this article
  2. AI: It’s a regular article on astrophysics…
  3. User clicks <More positive, please>
  4. AI: It’s great discovery on…
  5. User clicks <More positive, please>
  6. AI: Greatest discovery in 21st century…
  7. etc.

**Basic architecture / Control flow**
There is going to be a simple LiveView page for a conversation with initial text to analyze, buttons for human requests and texts for AI responses. AI and human messages happen in a strict order for now, i.e. [per initial business rules at least] user cannot click <More positive> two times, there’s no sense in it.

  1. User clicks are to generate Commands to Commanded. Click handlers disable buttons until command is fully processed under :strong consistency
  2. OpenAI request should probably happen in enrichment middleware per commanded recommendations
    2.1. Enriching in the aggregate could also be ok as for start at least there’s nothing else to happen to conversation while waiting for AI. However, that is likely to make event handler less testable, so middleware felt like a more proper choice
  3. Event handler generates events such as ConversationStarted, RequestStepAdded, ResponseStateAdded
  4. Finally projector glues them all into a nicely denormalized Ecto Conversation in a single Postgres table row

**The problem**
Everything should work fine until user reloads the page right after clicking the <More positive, please> button while we are waiting for an OpenAI response. After a page reload since events aren’t generated yet UI looks like user can click e.g. <More negative, please> and second command will be generated.

What happens then is that OpenAI eventually replies with a MorePositive response (that user didn’t even realize was still being generated) and then with a MoreNegatve response on top.

I can’t figure an elegant way to deal with such situation and hope for an advice.

**Ideal behavior**
In the ideal world one of these UX options could be acceptable:

  1. We forget about in-flight request completely like it never happened
    1.1 But how to cancel that still in-flight command processing then? Via some manual messing with enrichment middleware to cancel in-flight HTTP request?
  2. UI is still blocked waiting for response. But how would I know from just a conversation id that command handling for a particular conversation id is still in progress?
    2.1 Can/should I somehow subscribe to a particular aggregate to get notified when all of its strongly consistent activities are over?
    2.2. Or would it be better to keep such UI-level status outside of commanded completely, so I could e.g. store in yet another Ecto table (or cache even as we are talking about seconds or a couple of minutes max) outside Commanded the fact that particular conversation is waiting for an update?
    2.3. Or would you implement more fine-grained Commands/Events so that first we [very quickly] get something like RequestStepAdded event (projection of which would keep UI disabled) with a process manager adding a Command NowGoAskOpenAI. But that’s a bunch of extra commands/events just for UI and a need for some real HTTP error handling (if all enrichment happens in the original command middleware, then error doesn’t need to touch any aggregates)

**How to deal with it?**
I am puzzled. All these options don’t seem very elegant to me and feel like I am about to create lots of complexity for just a small UI candy. Yet I would love for the user not to have surprises of sudden conversation update without a visible reason.

I hope I am not the first one having situation when UI needs some progress information about a long term action initiated before-page-reload (it’s not event that much about Commanded/CQRS I suppose).

Am I missing some obvious option or are the options I was able to figure less scary that they seem to me?
What do you usually do in this sort of situation?

You could hook into LiveView’s lifecycle callback terminate/2 to stash a flag with ETS or Ecto when there’s an in-flight request.

Hi,

Thank you for putting so much effort in your post :slight_smile:

Now, I am not sure I understand why you use CQRS in this case.

(it’s not event that much about Commanded/CQRS I suppose)

If your ongoing conversation was backed by a simple genserver, when liveview connects you could easily tell it to drop the current request and start over, or ask it if it is already processing a command and reflect that in your UI.

So one thing underlying an event sourced system is the fact that they’re eventually consistent. The strong consistency feature of commanded is mostly a crutch, because as you noticed it only works by forcing a function call to block. That’s very useful for a lot of cases, but won’t help if you switch to a different process.

So really the question is how can you deal with that issue in an eventually consistent system and the answer is likely that you can’t (perfectly) solve this in your current setup. You need to acknowledge that the commanded parts of your system cannot prevent the user from sending multiple RequestStepAdded commands for as long as it doesn’t track any information about when it accepts new user input when it doesn’t.

In other words your issue is that your UI is trying to modeling a (perfect) back and forth between AI input and user input, while your system as it stands right now doesn’t seems to model that. I wouldn’t call that “small UI candy”.

Imo the correct solution here is to acknowledge that you’re modeling back and forth conversation and not arbitrary order conversation between the two parties and the complexities involved in enforcing that constraint.

First of all it is a self-educational project. I wanted to try CQRS on something I find exciting and AI chat happened to be it.

However, aside of “I just wanted to try a new technique on something cool” there are a couple of other reasons:

  • It seems to me that a “dialog pattern” of a typical AI chat resembles CQRS in quite a straightforward way as in “command sent → conversation updated → update ui ->”, doesn’t it?
  • Flexibility of the read model as I can easily design and redesign the read mode as I like up to the point of having a different read model for every screen (probably I won’t go exactly this far though). For many years of programming caring after the ideal DB schema that reflects real world well, yet is normalized AND is easy-fast-convenient to write AND is easy-fast-convenient to read was often a non-trivial part of a system that doesn’t contribute that much to the business problem. With CQRS already right now (when project is half-functional) I can already see how much less burden DB planning is. Sure, now I need to care after the aggregate modeling, but that’s just one thing and performance isn’t likely to be an issue at all.

Hmm, I intend to do exactly this. My current Conversation aggregate only allows commands from different parties in a strict order. Second human’s command RequestAdjustment in a row would be rejected if aggregate didn’t process the AI response yet. Enforcing this particular constraint on the aggregate level seems to be not very complex. I might be missing something, but to me it is indeed just about visualizing “AI is still thinking” part of a problem.

**CRUD ways**
In very traditional(ish) CRUD web service with synchronous request handling and no queues this could be solved by transactions in a HTTP request handling thread. When HTTP response is sent, the data is already written to DB. If user cancels the request, request handling thread would be dead and transaction would be canceled.

If you add some queues/asynchronousity on top of it, then if user reloads the page you essentially arrive to the same problem of “how do I visualize that previous order handling is still in progress”. Relatively straightforward way for solving it seems to store “in progress” status either in DB/external service such as ZooKeeper or Redis (if you really want to store important status even across server restarts) or in some local in-memory cache such as EhCache.

**What makes CQRS world special**
Maybe nothing :slight_smile:
Maybe I should just use something like elixir cachex to store a not-too-important in-progress status. I am new to the world of CQRS (and to elixir as well in fact) and I assumed that in a world where everything is essentially a set of commands in-flight being transformed to projections this sort of “how do I show user even after page refresh that it’s too early to retry” is a very standard situation, so there could be usual answers as well. Then I would go an read more on popular options and reasoning for it.

Like for example, if people would be saying much that it is a usual thing to have very detailed commands-events (e.g. AdjustmentRequestedByHuman event essentially meaning “in-progress period started”, then AIAskedForResponse, then AIProvidedResponse), I’d go study this approach more.

1 Like

You could model this as a conversation using Phoenix channels and presence.

So create a backend open AI process subscribed to a private topic shared between the user and the AI. Conversation then proceeds like any of Phoenix chat examples. This would allow for messages on a private topic “open AI is thinking…”, and your like/more positive/more negarive.

Use presence to terminate the AI proces after some grace period, or even allow multiple people and AI’s. to converse on the same topic.

This looks like any event based system, I don’t see the Q of CQRS.

if your main goal is to try commanded then you can disregard this comment, but otherwise this is what I would do.

In live view

  • First your conversation needs an id.
  • In liveview mount, when connected?(socket) is true, you start a GenSever under a DynamicSupervisor, that is registered on a Registry with a {:via, Registry, {YourRegistry, id}} tuple. This may return {:ok, pid} or {:error, {:already started, pid}}, in both cases you have the pid of your conversation server.
  • Your live view calls checkout on that server, which sends the view’s self() pid to the server.
  • The view can now ask that server what its current status is, and send commands to it.

In the conversation server

  • On init you define your state with a :awaiting_article status and a nil owner.

  • All your init, handle_call, handle_info and handle_cast should return a timeout if the current owner is nil.

  • When receiving a checkout command with the pid of the live view, set the owner to that pid, and monitor the pid.

  • The server will then receive commands like :set_article, :more_positive. You can implement a simple FSM (or use :gen_statem instead of GenServer but that’s overkill here). Depending on the current status, the server will react appropriately.

    status            command/event      action                  new_status
    awaiting_article  set_article        store in state, ask AI  await_ai
    await_ai          ai_response        send to owner           await_user
    await_user        more_positive      ask AI                  await_ai
    await_user        set_article        reset state, ask AI     await_ai
    

    And so on…

    When an unexpected command is received (double click on more_positive), the server can return an error, or just ignore it.

  • To deal with AI, the server can use a Task under a TaskSupervisor so the ai_response event would simply be the task response in handle_info, matching the ref of the task stored in state.

  • When receiving a :DOWN message from the monitor, set the owner to nil.

  • When receiving a :timeout (returned by your callbacks), if the owner is nil then stop.

  • On most events in handle_info, if the owner is not nil, send updates to the view.

And so,

  • When the user reloads the page, the server will receive a :DOWN but will also be checked out again, before the timeout hits, and will keep its current task running, if any.
  • You do not even need a DB for a base implementation.
  • If you want to cancel your in-flight requests to AI when receiving a :DOWN message, the conversation server can just call a function to do so. But you will need to replace the Task. Maybe just using Finch.async_request/3 and Finch.cancel_async_request/1 directly from the conversation server.

Anyway, I think this model is far more simpler than using commanded. There is always the same problem with CQRS, very often there is a need to know if some events are already in the queue before sending another event. When this need arises, it means that you do not want CQRS but rather a classic architecture where your updates to the database are fully handled synchronously.