Ecto.Multi nearly a monad

I am writing an application that makes use of a CQRS architecture. In this application a single command may create multiple events. There are certain concurrency issues that are managed by database constrains and so writing the command and events should be done as a single action.

@michalmuskala
Thought I would try and elucidate more what I said about Multi on twitter. https://twitter.com/CrowdHailer/status/942712473718837255

So a query is something that can be run later. And to my mind multi is about extending this to writes + querys in a composable way.

Queries can be run against a DB using Repo.all etc. If there was enough information on the Query you could replace that with Repo.run. Probably the nearest thing we have to run is Repo.transaction given a multi.

Multi.merge is the analogue to flat_map (close enough for my argument) and once you have that you can merge with a for comprehension. What I would like to write is

multi = Ecto.Multi.for do
  command <- insert(command_changeset)
  event1 <- insert(%{event1 | command_id: command_id})
  event2 <- insert(%{event2 | command_id: command_id})
after
  IO.inspect(event1)
  IO.inspect(event2)
  command.id
end

case Repo.transaction(multi) do
  {:ok, command_id} ->
    # stuff
  {:error, reason} ->
    # different stuff
end

NOTES

  1. Picked this example because both events rely on the command so piped calls to something like merge are not necessarily the best solution
  2. insert is Ecto.Multi.insert but without needed to explicitly give it a name in the multi object. Is not necessary once you can compose by binding to the potential return values.
  3. The merging of command_ids is simplistic for illustration, itā€™s more likely youā€™d use a changeset.
  4. The after block would make more sense as yield but limited to Elixir keywords, it should be executed on success.
  5. I added the IO.inspect lines because that was something I could not work out how to do in the current multi.
    i.e. How do I log some of the data I have written to db but only after the transaction but only after it has succeeded.
    I really donā€™t want to have to manually pull them out of the multi object.
5 Likes

When with was proposed, one of the possible additions was transactional with, which is pretty much what you proposed. The problem is that with/for are compile time constructs, Multi works at runtime which is quite more flexible.

Regarding the ā€œafterā€ callback, we should add a new operation to the multi that receives the result of the Repo operation {:ok, ...} or {:error, ...} and return either {:ok, map} | {:error, map}. Can you please open up an issue? I would like to hear yours and @michalmuskalaā€™s feedback.

4 Likes

Do you have a concrete example of some benefit you get because of this flexibility. Iā€™m not 100% sure I understand the comment.

1 Like

To clarify, the approach you proposed still classifies at runtime for me because you are still building a multi data structure that you pass around at runtime.

If everything was syntax base, imagine something like:

Repo.transact_with x <- ..., y <- ... do

things such as conditionally adding something to the Multi would be really hard. It would be the equivalent of conditionally adding an <- to the syntax, which is not possible, or you would need a way to express noop then. It would be hard to inspect the operations in the multi, compose through multiple functions, and so on.

2 Likes

Iā€™m thinking about extending multi for some time now, and I have some ideas. I have to say, though, that this is not the area I was exploring.

My main thinking was in expanding the Multi abstraction and building something lower-level that could be later used by ecto to build Ecto.Multi on top. Some of those ideas are explored by my friend @AndrewDryga in his library Sage.

Looking at your example, Iā€™m not sure itā€™s really different from doing something like (besides some additional verbosity with the explicit rollback and repo):

fun = fn ->
  with {:ok, command} <- Repo.insert(command_changeset),
       {:ok, event1} <- Repo.insert(%{event | command_id: command.id}),
       {:ok, event2} <- Repo.insert(%{event | command_id: command.id}) do
    IO.inspect(event1)
    IO.inspect(event2)
    command.id
  else
    {:error, reason} -> Repo.rollback(reason)
  end
end

case Repo.transaction(fun) do
  {:ok, command_id} ->
    # stuff
  {:error, reason} ->
    # different stuff
end

The problem is of course, that in this situation (and in the proposed syntax) - we donā€™t really know which operation failed. Sometimes that doesnā€™t matter, but sometimes it does - the current multi provides the information at the expense of requiring an explicit name for every operation.

3 Likes

Iā€™ve definitely wanted this in the past as well. Would be quite useful for things like keeping an external system in sync with the database.

1 Like

So, Iā€™m not really sure doing things like that inside multi is a good idea - this means that for the duration of the remote communication the transaction is open and connection tied up.

2 Likes

I use Ecto.Multi in my own CQRS/ES Elixir apps, but for read model projections rather than persisting events. In my opinion itā€™s a really nice fit.

I built a project/2 macro in my Commanded Ecto projections library to provide a DSL for projecting events that uses an exposed multi variable:

defmodule MyApp.ExampleProjector do
  use Commanded.Projections.Ecto, name: "example_projection"

  project %AnEvent{name: name}, _metadata do
    Ecto.Multi.insert(multi, :example_projection, %ExampleProjection{name: name})
  end
end

You should consider persisting the command separately to the events as this will allow you to record failed commands such as those returning an error, raising an exception, or just not creating any domain events. For Commanded I built support for command dispatch middleware and have written an audit middleware that records every dispatched command, itā€™s success/failure outcome, and execution duration using Ecto.

Itā€™s also worth using causation and correlation ids to track the flow of commands and events in your app.

  • causation_id - the id of the command causing an event, or the event causing a command dispatch.
  • correlation_id - an id used to correlate related commands/events.

They allow you to correlate related commands/events and look at causaility chains, useful for debugging purposes.

To persist events to Postgres I created the EventStore library. It uses a multi row INSERT statement to append a batch of events in one query and also assigns each event a globally unique, monotonically incrementing, and gapless event sequence number.

For more resources on the subject Iā€™m compiling an awesome Elixir and CQRS/ES list.

2 Likes

I thought that this is would be an ā€œafterā€ callback and will be called once the transaction is complete. Is that incorrect?

1 Like

Of course, here the success tuples are used as (explicitly matched) optional/maybe type.

As far as my current understanding goes, Ecto.Multi is not a monad because I cannot think of a sensible way to implement wrap and chain for them.But yes, their return values have a very clear succeed/fail scenarios which you could expose using explicit with-syntax or semi-explicit ā€˜monad doā€™-syntax. But the monad here is the success tuple and not Elixir.Multi.

2 Likes

Yes, it should be done after the transaction.

2 Likes

It would be if you tried to use a for for everything but the nice thing about recognising a monad is it tells you all the functions you need. so as long as for was built ontop of map/flat_map and they were exposed then they would handle all your needs. The for syntax being left to only the cases it is suitable.

My suggestion is really should only be syntactic sugar on what exists below. I havenā€™t discovered all of ecto yet and your example is actually really helpful in telling me how I will probably handle the problem today.

Unreliable because there is always the case where the transaction success and the communication with the external system fails

We have considered this case, and have it covered, we however write commands + events in same transaction because we use command as idempotency indicator for handling retries from the client. But this is definetly a topic for another thread

No. this is not what I was discussing. The DBMonad I am talking about wraps a potential value that is extracted by running against a DB. because the contents of a DB varies the most common case is to have the wrapped value be a result monad but that is not a firm requirement.

I started writing a more detailed explanation but that turned out to just be the Slick documentation. So I am no and have decided to let people refer to them if they want to find out more.

p.s. it took me a good week to get my head around what it was doing
p.p.s That alone might suggest itā€™s not a good abstraction but I was able to use it without grokin how it was monady

3 Likes

Somewhat related to what I was asking at the beginning. Is there a way to use Multi to query the DB in the transaction so you could use that value in some later insert.

Ecto.Multi.run does only expect {:ok | :error, any()} to be returned by the passed function, so you could just as well query the db within that function.

1 Like