Commanded/CQRS: how to project "gluing/merging" into the existing projection

I am learning Commanded and CQRS in general with a small pet project that is essentially yet another wrapper over a ChatGPT conversation with the prompts/buttons specific to the use case.

I have created a Conversation aggregate and pretty significant part of events is going to be something like ConversationStepAdded.

**Read model**
On the read side in one of the read models I’d want to get a nicely denormalized conversation structure (a nice feature of CQRS!), something like:

- conversation_id
- conversation_title
- some_other_metadata
- a_list_of_conversation_steps
  - conv_step_type // e.g. user or assistant related
  - conv_step_text
  - some_more_step_related_data

Ideally this whole structure will be an Ecto schema with a_list_of_conversation_steps being an embedded_schema (via embeds_many) that is probably going to be mapped to Postgres JSONB field.

**Problem: how to update a_list_of_conversation_steps from a single step event?**
Looking at how commanded projectors work (and CQRS in general?) projection is to be build one event at a time without seeing an aggregate.

Then how would I update a_list_of_conversation_steps when ConversationStepAdded arrives? Since I am learning, I will highly appreciate not only answer to this particular question, but general guidance as well.

  • Should I make ConversationStepAdded event carry not just the step being added, but whole list of steps accumulated by now to rewrite whole a_list_of_conversation_steps field via SQL UPDATE?
  • Should I go deeper SQL/Ecto and create a complex Ecto.Multi changeset to append a to a_list_of_conversation_steps’es embedded schema and then Ecto will somehow map it to proper SQL (I guess somewhere on the way I will still need to learn which exactly SQL can append to JSONB field)?
    • I guess it could work in this particular case, but not so nicely e.g. if a_list_of_conversation_steps would be just a long string without explicit embedded_schema
  • Or is it possible (and recommended?) to have some elixir code there to read the current projection, operate on it in Elixir level (well, still via changeset I suppose) and UPDATE it back to database?
  • Or something very different? Maybe the whole approach is wrong and there is a way for this problem not to exist in the first place?

What would you do in such a situation?

In a read model projector you can fetch existing data that has already been projected and update it. So in this example when projecting the ConversationStepAdded event you could fetch the existing Conversation from the database and then add the new conversation step to the list of conversation steps before saving the updated model. You can use the Ecto.Multi.run/3 function to fetch and update an existing model in a Commanded Ecto projector.

It’s not really necessary to use Ecto changesets in a read model projector for validation purposes since the domain events are the source of truth in an event sourced application. The projector cannot reject an event that has already happened. Its job is to represent the application state in a format optimised for querying needs, as described by the events that it receives.

1 Like

Thanks, I was not sure if fetching data from Repo during projection is fine, but since I am accessing the same projection (same table built by this projector only) I guess it’s fine.

For right now I went without Multi.run (that’s for the next learning session probably), but things worked as:

project %RequestStepAdded{} = step, fn multi ->
  case Repo.get(Conversation, step.conversation_id) do
      # Failure should never be possible actually. Maybe we should rewrite it to get!
      nil ->
        Logger.error(...
...
     conv ->
        # manipulate conversation
        changeset = Ecto.Changeset.change(conv)
        adjustment_addition_changeset = Ecto.Changeset.put_embed(changeset, :adjustments, fresh_adjustments)
        Ecto.Multi.update(multi, :add_req_step, adjustment_addition_changeset)
end

For the educational purposes: Do I get it correctly that order of projections built is only guaranteed within a single aggregate (or even within a particular project function for a particular event on that aggregate?)?

For example, imagine if for adding those adjustments I’d need to consult another table containing a participant name (created by a different projection on a different aggregate from some %PersonNameSet{} event). I can guarantee that %PersonNameSet() happens before his %RequestStepAdded{} happens, but can I be sure that person name projection is completed and committed before request step adding projection starts?

If no such guarantee then… is it a signal to contain everything that may be needed for reading around a particular activity (business domain?) to be packaged inside a single aggregate and e.g. person name to be added to several aggregated?

A projector reading from the table(s) that it “owns” and is projecting into is absolutely fine and recommended.

A projector is a specialised event handler which means by default it receives events from the globally ordered $all event stream. Which guarantees that events are always received in the order they were persisted to the eventstore. The same order for all event handlers that are configured to receive all events.

Since event handlers (projectors) run independently they are not guaranteed to be runing in sync, one slow handler does not hold up any others. Therefore it is not safe to query tables that are populated by another projector. The other projector could be lagging behind and have not seen the event(s) that the second projector expects to have been projected. To resolve this you could have a single projector to project into all the affected tables, or you project the subset of person info needed by the projector that needs to query it, or you accept that there might sometimes be stale data for querying that has be resolved somehow.

1 Like