Why store a list of events in a relational database
Using a relational databases, such as PostgreSQL, provides a robust, battle-tested, and performant platform to build an event store. It also brings plenty of tooling for production: full/partial/hot backups; cluster support; diagnostics, profiling; and more. Most databases use an append only log to persist transactions anyway, so it’s not so dissimilar to event sourcing. In the EventStore library I also disable UPDATE
and DELETE
statements on the events
table to make it append-only and immutable.
Saying that, you could write an event store that simply writes files to disk. The public API is small: append events to stream, read stream forward. However you then need to deal with all the issues that inevitably come up when writing files and the tooling I mentioned above.
Another interesting option for an event store is to use the new Redis streams. That looks like a really good fit for event sourcing, but remember to enable fsync
per write.
Running through the events and filtering them to pass them on to the different event handlers/projections could be done using Flow
For event handlers I took the approach of having them run independently and fully autonomous. This is similar to how Greg’s Event Store works with its competing consumers model. Why would you want to do this? It allows handlers to run at different speeds, typically you have slow async handlers that can lag behind (e.g. sending emails, third party API requets). But you don’t want them to hold up read model projections to minimise query latency.
Autonomous subscriptions allows you to add new handlers and replay all events from the beginning of time, or restart a handler to rebuild a projection. I’ve implemented a hybrid push/pull model for the Event Store subscriptions where appended events are published to subscribers, but they are buffered per subscriber and use back-pressure to ensure the subscriber isn’t overwhelmed. The subscription falls back to pulling events from the store when it gets too far behind, until caught up again.
You could use GenStage
for this, but I would recommend using an individual flow pipeline per handler; not one flow for all handlers. Since GenStage
’s broadcast dispatcher can only go as fast as the slowest consumer. You also want to have any event handlers run from the event store, after the events have been atomically persisted. Appending events to the store should guarantee that a success reply is returned only after committing to the underlying storage.
I do not currently understand the need for a separate library that handles the creation of ‘commands’.
As @karmajunkie says, commands are simply request messages that can be rejected (e.g. debit account). Whereas events are messages of fact, stating something has already happened and cannot be rejected (account debited, account is overdrawn).
Process managers are the inverse of aggregates. An aggregate handles commands and creates events; a process manager handles events and creates commands. They may both have state that helps them to either validate a command (aggregates) or route a command to the appropriate aggregate (process managers). You can model both of these as finite state machines. Often I simply use a state
field and use Elixir’s pattern matching to do just that, rather than explicitly modelling the states and allowed transitions. @sasajuric’s fsm is one library that could be used.
The approach I’ve taken is heavily influenced by my previous experience developing CQRS/ES applications using an object-oriented language (C#), not necessarily from a functional perspective.
Hope this helps you out!