Opinion on file & memory based event sourcing system

I could write forever about this, but I’ll do my best to keep it succinct.

For anyone familiar with event sourcing, what is your opinion of an ES system that logs all it’s events to a file, and keeps all of it’s projections in memory. It would be a cross between Joe’s love of term_to_binary and Martin Fowler’s memory image.

Whilst it seems like an artificial constraint — no external DB dependence — I think it could be an interesting system and provide some advantages, mainly around speed courtesy of the memory image and easy transport/storage of data (S3 or perhaps even Git for some ES inception!)

I know (and use) projects like Commanded, so understand that ES is much more involved in reality than just reducing over a list of events — the mechanics of how Commanded works are not to be underestimated — but I think that a simpler implementation of ES might be warranted in some cases.

This idea has been bouncing around my head for a few weeks, and I’ve done a few experiments — mainly using Erlang’s disk_log, but I think DETS and mnesia are worth exploring — but thought I’d get some other opinions on the concept before venturing further. Worth putting time into, or should I just continue using Commanded? :rofl:

2 Likes

An ES framework that was simple to use as ActiveRecord or Ecto would be amazing. My use cases: hot backups, auditing, rewinding, analytics, machine learning in financial apps. Elixir provides so much tooling: CRDTs, PubSub, GenStage/Broadway not to mention Phoenix and Ecto itself. IMO the best approach is to focus on Commanded, learn it inside-out, then figure out how to make the experience for new devs easier and more streamlined. RN the learning curve is high, but I believe it can be made much simpler with time and attention.

My opinion: if you write a simpler ES library, let it be a wrapper on top of Commanded.

1 Like

I hadn’t originally planned on making another framework, at least initially. Elixir lends itself to ES so well that many don’t believe a framework is required and only adds another layer of abstraction. As you pointed out, so much is already provided by Elixir and OTP. I believe that whilst another framework probably isn’t a good idea, a library or at least a thorough guide on implementing ES from scratch could provide that simple introduction. Additional features (and complexity) could be added as the project required.

After a good amount of time going over Commanded’s code, I certainly couldn’t implement a simpler framework - in use or design - than @slashdotdash already has without sacrificing flexibility and sensible design decisions. I must say that the docs and codebase are so easy to follow and pull apart.

One thing I had considered was following a similar structure to Commanded & EventStore where the two are separate apps with a clearly defined API between them. That way they could share resources, with my memory image implementation being able to use EventStore and Commanded being able to use my file based alternative. Not sure if there would be much point, but it feels like it would be worth doing.

I totally agree that an “ES-lite” framework that’s as simple and modular as Ecto would be a wonderful thing, but I’m not sure how to go about it. ES almost by necessity requires CQRS - they aren’t as independent as many claim - which is where the complexity ramps up. Perhaps someone (smarter than me) writing up an in-depth guide to ES in Elixir is the real solution here?

EDITED TO ADD: Ben’s work on the Building Conduit book not withstanding of course. Without that I wouldn’t have been able to grok ES at all, but I had to learn Conduit to learn Commanded to learn ES, so perhaps a pure Elixir version something similar?

1 Like

We’ve built https://github.com/CargoSense/fable for that kind of thing at Cargoesnse, although sadly there’s basically 0 documentation right now.

The core idea is that in your application you often have “root” database tables and sort of auxilliary tables. As a simple example, we have a trips table, and then there are things like alarms, trip grants, trip statistics, and so forth. We wanted to emit events on a given trip, and ensure that all updates to the trips table itself and the associated alarms, grant, and statistics all came from those events.

Here is a basic example out of our actual code:

The basic concept is you have a simple mapping of event struct names to handler functions:

defmodule Maven.Events do
  use Fable.Events,
    repo: Maven.Repo

  alias Maven.{Accounts, Travel}

  def handlers() do
    %{
      Travel.TripStarted => &Travel.trip_started/2,
      Travel.TripEnded => &Travel.trip_ended/2,
      Travel.TripGrantIssued => &Travel.trip_grant_issued/2,
      Travel.TripGrantRevoked => &Travel.trip_grant_revoked/2,
       ...
    }
  end
end

And then your context functions emit an event:

def start_trip(current_user, animal, tracker, %{id: id} = attrs) do
    Repo.serial(%Trip{id: id}, fn trip ->
      with :ok <- current_user |> can(:start_trip, %{animal: animal}),
           :ok <- animal_available(animal, current_user),
           :ok <- tracker_registered(tracker),
           :ok <- tracker_available(tracker) do
        event = %TripStarted{
          trip_id: id,
          animal_id: animal.id,
          started_by_id: current_user.id,
          tracker_id: tracker.id,
          started_at: Map.get(attrs, :started_at, DateTime.utc_now())
        }

        Events.emit(trip, event)
      else
        error -> error
      end
    end)
  end

The %Trip{} struct is a 100% ordinary Ecto schema, it just has the addition of a field(:last_event_id, :integer, read_after_writes: true). This is used by Fable to guarantee that events are processed serially, and no events are skipped. The Trip{} row acts as basically the aggregate state in ES terms.

The %TripStarted{} event is just an ecto embedded schema, which gets written to the database when we call emit. Then Fable runs the specified handler functions, passing each the trip aggregate and the emitted event, and it’s this handler function’s job to go update the trip database row and any other associated rows that should react to this event:

def trip_started(trip, event) do
    attrs =
      event
      |> Map.from_struct()
      |> Map.put(:id, trip.id)

    trip_changes = Trip.changeset(trip, attrs)

    with {:ok, trip} <- Repo.insert(trip_changes) do
      create_initial_grants(trip, event.started_by_id)
      maybe_create_test_data(trip)
    end
  end

All of the above is done in a single database transaction.

Importantly, Fable does NOT require CQRS. You’re totally allowed to just query the trips table for read state too. It is absolutely event sourcing though, since all writes to the trips table and its associated tables start with an event and it guarantees that all events are processed in order for a single aggregate. And since it’s all just Ecto and postgres, so Ecto async tests work out of the box.

If you wanted to do it CQRS style though you totally could by making your event handler functions only update the aggregate row state, and then driving all your read model changes off of a process manager. Each Fable process manager is a GenServer backed by a database row that tracks progress through the event log, and thus you could use that to manage a separate set of read tables.

If there’s sufficient interest I’ll try to get some docs up.

8 Likes

It’s interesting that in Elixir and more broadly we already see separation of read and write models. Ecto changesets and queries are separate things. GraphQL Queries and Mutations are separate things.

Maybe we can find simple tricks to give ES-like capabilities quickly. For example, I think it would be easy save the changeset every time you call Ecto.insert, to use as an audit trail and perhaps for hot backup/restore.

The main challenges I’ve found with ES: 1) duplication of effort and coordination costs between Command/Event/Projection layers (duplicate validations & data-models for aggregates/events/projections that duplicate code) 2) Managing CHANGE, versioning events and all the overhead therein.

Ben’s fable code looks nice…

Exactly because I don’t want external DBs I am working on bringing in sqlite3 to Ecto 3. Technically it is an external DB but it all works in-process so no external server to manage. I’d go for one on-disk sqlite3 DB and another one that is in memory.

However, the rest is up to you so I don’t think it’s what you are looking for. Just giving you an idea for a storage backend.

Storing events directly on disk is how both Axon Server (Java) and Event Store (C#) deal with persistence. The source code for both products is available to view on GitHub. For Event Store there’s a two hour video by James Nugent (one of the core contributors) on the internals and a page outlining the internal architecture in their official documentation.

Writing to disk is easy to start with but presents quite a few challenges which have already been solved by all modern DBMS (which is why my Elixir EventStore uses Postgres). Some considerations include flushing writes to disk to survive power loss; compaction of the event log and scavenging of disk space; backup/restore; handling data corruption; multi-node deployment; indexing and performance.

Once deployed to production you also need to consider how devops can support and optimise the application for its data access patterns. Using an existing storage provider gives you access to tooling, expertise, and managed services so someone else can take care of hosting the database for you.

But I’d agree it would be an interesting problem to work on, even if you don’t need to use it at scale.

1 Like

Commanded provides the Commanded.EventStore behaviour as a way of plugging in different event store adapters. If you wrote an event store persisting to disk you could use it with Commanded by adhering to the above behaviour (or writing an adapter/facade for compatibility). In my experience it’s the subscriptions that is the more complex part of building an event store.

This would be pretty much my approach for the memory image too. You can always just query the aggregate that’s already sitting in a GenServer. Where my implementation would require CQRS is that some queries (over a multitude of aggregates for example) would be difficult or at the very least slow. In that instance I’d create other aggregates that handled these queries and were optimised for them.

How does replaying events work in your system? Do you just replay all the events and create the aggregate rows as you go? I found that thinking about the problem from the perspective of replaying events rather than direct commands => events highlighted a lot of shortcomings in my design. The main one was that picking up events from various files was more complex/less reliable than just reading off a list of events from a DB table.

That’d be great! I’ve had a look through the source code before (and again this weekend) and it’s clear but a lot of the “why” gets lost which I think is where a lot of the value comes from, and where docs are really helpful! (At least for me)

That would give you the ability to replay “events” and rebuild the state of the application, but I think you’d be missing one of the biggest advantages of ES - a domain specific language for the changes in your system…

… and it’s that domain specific language that tends to make a lot of the code duplicated. There doesn’t need to be a conceptual difference between commands and events, but they are quite useful to ensure that events can always been applied (for the purpose of replays) and that commands can sometimes be rejected with helpful messages. I’ve found that whenever I try to drop the commands to simplify the system, I actually end up Command Sourcing rather than Event Sourcing. I map the intent (the command) rather than the outcome (the event) and rely on my application to consistently end up in the same state despite commands having the potential to be rejected. Removing the duplication by removing commands and sticking to just events tends to create a one-directional system in my experience where a users actions are just sent into the ether and may or may not result in an event. A defined commands provides a feedback mechanism.

As for duplicate models between aggregates and projections, that’s the CQRS bit coming in. As Ben pointed out above, if you can get away without additional read models and just directly query the aggregate you can drop the additional complexity and duplication that projections bring, but you also drop the ability to have tailored read models.

I think a degree of duplication or at least similarity throughout a domain is always going to be required for this structure. That’s where the flexibility and granularity come from and the benefits start to diminish if this duplication is too ardently avoided in my experiments.

This is something I haven’t struggled with yet, but I haven’t been running ES systems for long and went to great pains to ensure that any changes to my domain/models were additive, in that new events had extra information that old ones didn’t and that info was just inferred for the old events.

I hadn’t considered sqlite because my head was so firmly focussed on the BEAM VM. That could be a good alternative, albeit without some of the advantages (keeping data as terms).

I had read that page, but hadn’t seen the video. I’ll have a watch!

That’s pretty much my experience too. I had wanted to go with a very simple and “pure” implementation using just File and term_to_binary but it very quickly becomes apparent that…

…these problems are real and need addressing. I hadn’t considered them until quite far into my first version.

This I thought could be quite straightforward as it is effectively just like backing up any other files on your computer/server.

Again, I hadn’t considered this, but the “what-ifs” started creeping in as I was working on it.

I was going just for single node for the time being. I wanted to try this approach as a simpler, smaller scale alternative so multi-node wasn’t a concern.

I had hoped that IO performance wouldn’t be an issue as I could rely on eventual consistency and my in-memory data to provide all the speed I’d need. Of course, complete replays would be expensive and slow, but hopefully that would be a rare occurrence.

I’ve been working to that behaviour so far…

…but as you say, it’s been the subscriptions thats the tricky part!

I’ll keep cracking on with it, but instead of building out my little side-project/business with the file and memory based ES, I’ll use Commanded with the aim to switch from one to the other if I can maintain consistent API between them.

Thanks for all the feedback guys! If you have any more thoughts, please keep adding them. This is very much an ongoing thing.

2 Likes

Our team at Inflowmatix built our ES/CQRS into our platform using a combination of meta-programming, protocols and functional Elixir. We originally ran our platforms domain store (behind a protocol to allow switching) on Mnesia for around 18 months in Production. We used term_to_binary along with compression and made heavy use of snapshots of our Aggregate state to reduce Aggregate startup costs/times. Eventually though, as our most active Aggregates started to have a long history and frequent snapshots, the table fragmentation in Mnesia was getting heavy. Additionally the locking model at a table level for certain transactions caused frequent rollbacks and retries with lots of conflict resolutions, which eventually converge, but with heavy load can cause upstream timeouts on GenServers especially where a call was needed to assure the transaction completed.

Eventually after many small repairs and refactors we switched our store to Postgresql and we were able to use our protocol to write a splitter module that wrote to both stores for a while (and read from Postgresql) until we migrated the historic data in the background to Postgresql then switched the splitters behaviour to use only Postgresql.

Just some notes really to perhaps bear in mind.

4 Likes

It’s really great to hear what a similar system was like in production!

I had considered using Mnesia, as it meets the criteria of being on the BEAM and would provide a lot of the extra control around saving to disk that I would otherwise have to write.

Transactions have become more of a requirement/issue the more I’ve looked into the concept. Guaranteeing that events are written to disk fully means that the entire event store effectively becomes sequential and blocking — unless I’m misunderstanding the fundamental requirements…

I’d very much like to make the file-based event store work, but it’s increasingly looking like PostgreSQL is a far more pragmatic choice.

Seeing as I’m looking at using files for persistence and GenServers for the memory image then perhaps all I’m looking for is a GenServer that can be recovered/restored/rebuilt from file. Something like PersistentGenServer, Mnemonix, or even CacheX? It wouldn’t be ES as commonly practised and more actor model with some events stuffed into it. I’d have to do some thinking on whether this would suit my requirements.

I’d be interested to hear more about this, especially as I’m not far from you in Winchester. We should meet for a coffee to chat solutions. Did you consider using Commanded & EventStore before building your own solution?

The main reason I think was that we built it in 2015 before I think you started your work.

Sure it would be good to meetup if you fancy coming over to the science park.

Mike

I’m still chipping away at this whenever I get a moment, and thought that I’d provide a quick update on where I’m at.

I tried using many different options for storing events, all using a shared interface so that I could quickly switch between them. I’ve found some wonderful little features of both Elixir and Erlang and some cool libraries to help me along the way — it really has been a brilliant learning experience!

So far I’ve tried:

  • Simple term_to_binary and File.open(x [:binary, :append])
  • DETS
  • Erlang’s disk_log
  • Erlang’s file:consult for reading events back and a custom io_lib:format function to write them
  • CubDB (using min_key and max_key for event number ranges was brilliant)

All had positives and negatives with disk_log probably coming out on top. It has a load of features that you really need for working with files already baked in and thoroughly tested. The deal breaker though was that it’s really hard to get events back out by their event number, as disk_log really doesn’t work with any keys, it literally just appends a given term to the end of the log.

Getting the events out required bringing all events into memory (ignoring disk_logs wonderful chunking feature) and filtering out events older than those we were interested in. This isn’t the worst thing in the world as I was planning on keeping logs partitioned by aggregate, meaning that they are unlikely to ever become so long that parsing the whole log becomes an issue as it’s incredibly fast - loading 100_000 average-sized events in approximately 75ms.

My concern was that I could end up with a compromised method for reading and writing the literal heart of this system.

The next phase in this experiment was yet more learning… Martin Kleppmann’s “Designing Data Intensive Applications”. I can honestly say that I learnt more reading this in a week than I did over the previous 9 months piecing together articles and documentation online. His closing conclusion of “turning the database inside out” is very in line with what I’m trying to accomplish here, although I definitely think he was talking at a much larger scale than I’m working at! :joy:

Importantly, the book highlighted many issues that I was unwittingly making with other parts of my system. I was neglecting a total order by writing all of my events in partitions based on aggregates. It wasn’t a deal-breaker though as I could accept partial order as a trade-off in this instance as causality isn’t an issue in my domain. However, these partitions did make my projections quite complex due to needing to track their event offsets (using consumer offsets over ack) for many, many, many streams. Not an issue but one that I decided that for now at least to avoid by opting for a single unified log with total order.

I’m using a very simple writer and index pattern from the book and at the moment it’s working very well. Sending a command to an aggregate, having it validated, events created, persisted and applied to the aggregate is taking on average around 150 micro-seconds. I still have to handle log rotation and snapshots, but these should be quite simple as I’m modelling my store as a series of GenStage (yet another thing I’ve learnt since starting this).

I’ve decided in many cases to opt for purposely naive approaches - whereas before they were just naive :wink: - in order to keep my system simple. My tests have shown that I’ve got a good amount of performance margin with which to make compromises such as the single log which is a potential bottleneck, but hugely simplifies the entire system. Equally, a single event stream, local-only Registry makes it a lot easier to work with and understand. I figure that it’s easier to make a system that already works well faster, than it is fix a fast system thats yet to work.

I have had crisis’ of confidence along the way — “Why aren’t you just using Commanded and PostgreSQL like a sensible person?” - “You’re totally out of your depth here!” - “Why even bother with event sourcing, CRUD could work here after all” — but overall I’m very happy with the progress I’ve made and the things I’ve learnt. I think I’m probably a little way off being able to build anything complex with it yet, but I’m enjoying the process.

3 Likes

Sounds like you’re having fun. Martin’s book is a fantastic reference for building these types of systems.

Once you have a working single-node system you can experiment with making it distributed for scalability and fault-tolerence. It looks like disk_log supports distribution, but likely not useful because it doesn’t guarantee writes go to all nodes:

A collection of open disk logs with the same name running on different nodes is said to be a distributed disk log if requests made to any of the logs are automatically made to the other logs as well.

It is not guaranteed that all log files of a distributed disk log contain the same log items. No attempt is made to synchronize the contents of the files.

http://erlang.org/doc/man/disk_log.html

It’s been so much fun, albeit frustrating and head scratching at times. As you say, Martins book almost feels written with this type of system as the ultimate goal.

It’s so tempting to build on top of disk_log because it’s already there and does so much of what I want to achieve, but the hurdles I think are too great. The querying of events (or lack thereof) will become a real issue with a single event stream, and as you point out, the built in distribution isn’t all that helpful in this case.

Distribution and replication whilst maintaining consensus on event ordering is where I’ve spent a lot of time re-reading sections of the book. Unfortunately it looks like single leader replication of the log is the only really viable way to approach the problem, and therefore not hugely useful as I’ve still got a single node as a bottleneck.

The only way to distribute the log (either locally or across machines) would be to go the same route as Kafka and break my event streams into topics and partitions (as I was originally doing) which brings a lot of additional complexity with it, both in persisting events and making them available to the rest of the application.

As my use case is smaller, simpler applications I think that restricting myself to a single node (at least for the event store) is not unreasonable. I’m opening a can of worms and losing sight of the objective by trying to solve problems with distributed logs and total ordering, so for now I’m keeping my solution as simple as possible.

As both file and database backed logs run into the same restrictions around distribution, the only real question that remains is which is better in terms of scale and throughput on a single node. I suspect that’ll be a DB like Postgres, unless the lack of encoding/decoding of terms starts to play a larger role in performance. We’ll see…

1 Like

If you make the event store single node then you could expose an API that other parts of your application can use. Treat it like a database which as many nodes as required can connect to. Event Store exposes a “native interface of AtomPub over HTTP” as an example.