Domain Driven Design: Rehydrating an elixir process after exit/termination

I am trying to create an elixir app using domain driven design principles and has a User module as the aggregate root. I am than trying to create a separate process for each user, for example I will have one process for a user with id “123” and another process for user with Id “456” and so on.

User module:

    defmodule Bhaduli.User do
    use GenServer
    alias Bhaduli.User.{BasicInfo, EducationalDetails}
    @registry :user_process_registry

    defstruct [user_id: nil, basic_info: %Bhaduli.User.BasicInfo{}, educational_details: %Bhaduli.User.EducationalDetails{}]

    def start_link(name) do
     GenServer.start_link(__MODULE__,name, name: via_tuple(name))
    end

    def init(name) do
        {:ok, %Bhaduli.User{user_id: name}}
    end

    def update(id, %BasicInfo{} = basic_info) do
        [{pid, _}] = Registry.lookup(@registry, id)
        GenServer.cast(pid, {:update_basic_info, basic_info})
    end

    def update(id, %EducationalDetails{} = educational_details) do
        [{pid, _}] = Registry.lookup(@registry, id)
        GenServer.cast(pid, {:update_educational_details, educational_details})
    end

    def get_basic_info(id) do
        [{pid, _}] = Registry.lookup(@registry, id)
        GenServer.call(pid, {:basic_info})
    end

    def get_educational_details(id) do
         [{pid, _}] = Registry.lookup(@registry, id)
        GenServer.call(pid, {:educational_details})
    end

    def get(id) do
        [{pid, _}] = Registry.lookup(@registry, id)
         GenServer.call(pid, {})
    end

    def handle_cast({:update_basic_info, basic_info}, user) do
        user = %Bhaduli.User{user | basic_info: basic_info}
        {:noreply, user}
    end

    def handle_cast({:update_educational_details, educational_details}, user) do
        user = %Bhaduli.User{user | educational_details: educational_details}
        {:noreply, user}
    end

    def handle_call({:basic_info}, pid, state) do
          {:reply, state.basic_info, state}
    end

    def handle_call({:educational_details}, pid, state) do
          {:reply, state.educational_details, state}
    end

    def handle_call({}, pid, state) do
          {:reply, state, state}
    end

    def via_tuple(user_id) do
         {:via, Registry, {:user_process_registry, user_id}}
    end
end

This User module is my aggregate root, and I am using process Registry for managing all the process Ids. Also I have defined a Supervisor which supervises all the user process with simple_one_to_one strategy. Below are the following things I’m not able to figure out.

  1. How do I make sure that the new process that was created by Supervisor to replace the old one, has been initialized properly. I can load it from DB, but with my current implementation, it seems I would need to put that database access into the User aggregate (inside User.start_link method) which is not a good idea.

  2. Should I really have a supervisor for user processes or I could let them die and resurrect them once I get a request again.

Entire code is here : https://github.com/nav301186/rumuk/tree/feature/ddd-actor/apps/bhaduli

I’ve built an open source Elixir library - Commanded - that provides similar functionality. Feel free to use, or take inspiration from.

  1. The way I approach rebuilding state is for the aggregate GenServer process to cast a message to itself during init to fetch its state from the database. See aggregate.ex

    def init(%Aggregate{} = state) do
      # initial aggregate state is populated by loading events from event store
      GenServer.cast(self(), {:populate_aggregate_state})
    
      {:ok, state}
    end
    
  2. Aggregate processes are started by a supervisor, using a simple_one_for_one strategy with temporary restart (i.e. not restarted on failure). So a failed process will be restarted - and its state rebuilt - the next time a command is routed to it. See supervisor.ex

2 Likes