We’ve built GitHub - CargoSense/fable for that kind of thing at Cargoesnse, although sadly there’s basically 0 documentation right now.
The core idea is that in your application you often have “root” database tables and sort of auxilliary tables. As a simple example, we have a trips
table, and then there are things like alarms, trip grants, trip statistics, and so forth. We wanted to emit events on a given trip, and ensure that all updates to the trips table itself and the associated alarms, grant, and statistics all came from those events.
Here is a basic example out of our actual code:
The basic concept is you have a simple mapping of event struct names to handler functions:
defmodule Maven.Events do
use Fable.Events,
repo: Maven.Repo
alias Maven.{Accounts, Travel}
def handlers() do
%{
Travel.TripStarted => &Travel.trip_started/2,
Travel.TripEnded => &Travel.trip_ended/2,
Travel.TripGrantIssued => &Travel.trip_grant_issued/2,
Travel.TripGrantRevoked => &Travel.trip_grant_revoked/2,
...
}
end
end
And then your context functions emit an event:
def start_trip(current_user, animal, tracker, %{id: id} = attrs) do
Repo.serial(%Trip{id: id}, fn trip ->
with :ok <- current_user |> can(:start_trip, %{animal: animal}),
:ok <- animal_available(animal, current_user),
:ok <- tracker_registered(tracker),
:ok <- tracker_available(tracker) do
event = %TripStarted{
trip_id: id,
animal_id: animal.id,
started_by_id: current_user.id,
tracker_id: tracker.id,
started_at: Map.get(attrs, :started_at, DateTime.utc_now())
}
Events.emit(trip, event)
else
error -> error
end
end)
end
The %Trip{}
struct is a 100% ordinary Ecto schema, it just has the addition of a field(:last_event_id, :integer, read_after_writes: true)
. This is used by Fable to guarantee that events are processed serially, and no events are skipped. The Trip{}
row acts as basically the aggregate state in ES terms.
The %TripStarted{}
event is just an ecto embedded schema, which gets written to the database when we call emit. Then Fable runs the specified handler functions, passing each the trip aggregate
and the emitted event, and it’s this handler function’s job to go update the trip database row and any other associated rows that should react to this event:
def trip_started(trip, event) do
attrs =
event
|> Map.from_struct()
|> Map.put(:id, trip.id)
trip_changes = Trip.changeset(trip, attrs)
with {:ok, trip} <- Repo.insert(trip_changes) do
create_initial_grants(trip, event.started_by_id)
maybe_create_test_data(trip)
end
end
All of the above is done in a single database transaction.
Importantly, Fable does NOT require CQRS. You’re totally allowed to just query the trips table for read state too. It is absolutely event sourcing though, since all writes to the trips table and its associated tables start with an event and it guarantees that all events are processed in order for a single aggregate. And since it’s all just Ecto and postgres, so Ecto async tests work out of the box.
If you wanted to do it CQRS style though you totally could by making your event handler functions only update the aggregate row state, and then driving all your read model changes off of a process manager. Each Fable process manager is a GenServer backed by a database row that tracks progress through the event log, and thus you could use that to manage a separate set of read tables.
If there’s sufficient interest I’ll try to get some docs up.