Building Conduit - Applying CQRS/ES to an Elixir and Phoenix web app (self-published)


#1

I’m working on a CQRS/ES and Phoenix book for those interested in building an event driven application using Elixir.


The full source code for the application, Conduit, is available on GitHub.


How would you recommend learning Phoenix today?
Books on the design of web application architecture
Your ideas for Elixir book authors and content creators
#2

Have you thought about contacting Pragprog or Manning Ben? Professional editing and direction is priceless Imo :slight_smile:

Also, how does your book fit in with Martin Fowler’s EDA thoughts?


#3

I opted for the self publishing route as I considered the subject matter too niche for a technical publisher, and wanted the freedom to write at my own pace.

The book takes a step-by-step approach to building a REST API using CQRS and event sourcing for the core domain model.

I’m also considering writing a chapter on “live queries” whereby Phoenix channels are used to push read model updates down to the client, in response to published domain events. I’ve already built a working prototype in another, closed source, web app that can be extracted.


#4

Tangentially related, I’ve been very curious about how GraphQL would line up with CQRS because from a prima facie perspective there seem to be some important similarities. GraphQL also breaks up operations into reads (queries), commands (mutations) and you get live reads (subscriptions) in there as well.


#5

I haven’t used GraphQL, but it looks well suited for use as the read model in a CQRS application.

You would define a schema/types for data to be queried. GraphQL mutations would be executed in response to published domain events to update data. Standard GraphQL queries can be used to display the data.

Typically a read model in a CQRS application will be fully denormalised to eliminate joins. A separate, read optimised, data model is created to support each particular query. In SQL, queries are ideally just SELECT * FROM table WHERE ....

I’d be interested to look into GraphQL’s subscriptions for pushing query updates down to the client. In the past I’ve used read model projections (i.e. CQRS event handlers) to publish notifications, using Phoenix pub/sub, when the data changes. You’d get this behaviour for free if you used GraphQL mutations.


#6

Here’s an example read model projection, using Commanded Ecto projections and Ecto.Multi, from the Conduit sample application from the book.

defmodule Conduit.Accounts.Projectors.User do
  use Commanded.Projections.Ecto, name: "accounts_users"

  alias Conduit.Accounts.Events.UserRegistered
  alias Conduit.Accounts.User

  project %UserRegistered{} = registered do
    Ecto.Multi.insert(multi, :user, %User{
      uuid: registered.uuid,
      username: registered.username,
      email: registered.email,
      hashed_password: registered.hashed_password,
      bio: nil,
      image: nil,
    })
  end
end

An example query:

defmodule Conduit.Accounts.Queries.UserByEmail do
  import Ecto.Query

  alias Conduit.Accounts.User

  def new(email) do
    from u in User,
    where: u.email == ^email
  end
end

You could apply the same pattern using GraphQL in place of Ecto.


#7

this book is completed 30%, are you planning to complete it in near future?


#8

Yes, it will be finished. It’s a stuggle to find the time in addition to consultancy, releasing the next versions of Commanded and EventStore libraries, writing documentation and providing support.


#9

Nice to see there is a conference about your commanded library :slight_smile:


#10

commanded - also got notable mention in this talk


#11

Hello!

Looking after cqrs\es for a while as it answers my internal feelings on the natural way of how the things happened (ES mostly), but never had an opportunity to implement this behavior. But now I decide to give it a try and got some questions, hope I can got advice from more experienced in this field people.

  1. The thing I’m evaluating at the moment is how to build a full log of system events. I haven’t seen many examples of CQRS\ES applications (internal architecture and code; by the way, may be somebody can post some links to the open sourced ones?), but in @slashdotdash example, Conduit, introduced in his very valuable ‘Building Conduit’ book, I see only successful events, like UserRegistered or AuthorCreated. What if I need to have a full log, including, for example, every authentication request ever made to the system, even the failed ones, and more - all connections ever received, all file upload\download requests and so on?
    I understand that I can introduce “negative” events like UserAuthFailed, UserRegistrationFailed or ConnectionRegistered with full headers dump, but is it good to have all such events in the main events store\log? Or is it better to put them externally? I tend to think it is more correct to have all events in one log, but have no idea how hard would it to work with this log, especially if I need to filter results in some way. I think using jsonb serialization and native Jsonb queries could be solution, but its just a thought.

  2. If I would like to have versioned entities (a history of updates\previous values, for example), is it a good idea to keep just a current projection and build a history from the event store by a user request?

And more specific: does @slashdotdash’s EventStore fast enough and support such behavior at all to develop the idea this way? And is it possible to replay steams by type (not by id)? For example, if an administrator would like to observe full audit log; it should consist all events and let to filter and sort them. And as I understand, EventStore currently doesn’t support backward stream reading (like read_stream_backward)?

  1. The project involves different external clients (apps) exchanging data with the backend with different versions over time and it is a tough choice is it better to send the current projection or send\reply appropriate events or mix between sending a current snapshot at the connection time and then just update information as it changes on the backend (while clients are online). Probably the last option is better, but I would very thankful if one be able to point me to the best practice\articles on this (connected to the ES pattern preferably).

I believe I’ll get more questions, but these ones I got now. Please share you thoughts if you find this topic interesting.

Thanks!


#12

You have a few options for tracking unsuccessful events:

  1. Create failure events (e.g. AuthenticationFailed) which are included in your event store. Here’s an example in Commanded’s aggregate guide where a failure event (AccountOverdrawn) is created during withdrawal.
  2. Audit your incoming commands, including whether the command was successfully handled or not. You can use Commanded audit middleware to do this. It also tracks causation and correlation identifiers between commands and events allowing you to follow the flow of messages in your app.

In an aggregate, you build state by replaying the events which allows you to track historical values. As an example, if you had a bank account aggregate you could capture the current account balance over time by recording each time it changes (due to deposit or withdrawal events). Similarly, to support a user query you could build a read model from those same events with the balance shown over time as a list of transactions.

The performance will likely depend upon your chosen event serialization format and how fast your Postgres database can write to disk. Appending events is done using a multirow INSERT statement, so it’s relatively fast. The only reliable way to answer the question “is it fast enough for my usage” is by simulating your expected load in your own deployment environment. To get a rough idea you can run the EventStore benchmark suite (MIX_ENV=bench mix do es.reset, app.start, bench) to see how fast it runs. EventStore writes 4,000-8,000 events/sec and can read 26,000-27,000 events/sec on my laptop.

It’s not possible to replay events by aggregate type - it only supports by aggregate identity or all events - nor read a stream backwards. But you likely wouldn’t use an event store for this purpose. Instead you would project the events into a denormalised read model designed to support the querying needs of your administrators. You could even use an event handler to push the events to an external system, such as Elasticsearch, Logstash, and Kibana, in real-time purely for admin purposes.

Hope that helps you out.


#13

I’d love to see this book completed. I read some of the pages, and it’s already a great book. Good work @slashdotdash


#14

@pillaiindu I’ve recently published the initial content for the chapter on articles, if you haven’t already seen that’s now available to read.


#15

Yes I saw it, earlier it was less than 30% completed, but now it’s 50% completed, which is very quick. Please, keep the good work up!

Thank you!


#16

I think that GraphQL would work very well with a CQRS system. My only interrogation would be when CQRS is used with Event Sourcing (which is often the case). When this is the case, we have eventual consistency, and I wonder what the effect would be on the mutations response types: let’s say I send a mutation, and I don’t have the time to project my event to my read model in time for the response: then the response data for the mutation is actually stale and will eventually be consistent, but not at the time of the response.

Does anyone have any ideas on the matter? have you experienced it? and did anyone find any workarounds?


#17

I’m using GraphQL for a CQRS/ES Elixir app where mutations are mapped to commands and dispatched. I’m using Commanded, which has async eventually consistent handlers (e.g. read model), but it allows you to simulate strong consistency during command dispatch. This is the approach I currently use.

Another approach is to not return any data from your mutation, but instead use a subscription to have the server push data to the client after it has been updated. If you use Commanded’s Ecto projections library it provides an after_update callback function which would be the ideal integration point to push to an Absinthe subscription.