How to distribute data periodically into supervised processes

Hello everyone, I`m starting to learn Elixir and I have one first big challenge to solve. Hope someone can help me !

Okay, first of all, I have a list of alertConfigs(stored in an external database), that looks like this:

[
    {
        code: "A",
        comparison: ">=", 
        message: "A is greater or equal 50",
        value: 50
    },
    {
       code: "B",
       comparison: "<=",
       message: "B is less or equal 100",
       value: 100
    },
    {
       code: "C",
       comparison: "=",
       message: "C is equal 20!",
       value: 20
    },
...
]

My first task is to create one process foreach item of the list (dynamic supervisioned, because one new alert may be created while the application is running). The process must be restarted(with the initial state) in case of failure.

Then comes the hard part(or the not clear part for me).

I have a websocket server that sends a infosList every minute, the list looks something like this:

[
    {
       code: "A",
       value: 15
    },
    {
        code: "B",
        value: 150
    },
    {
        code: "C",
        value: 21
    }
]

The codes and values of the list can change(usually, only the value will change, but more codes can be added every minute). My challenge is to be able to split the list into chunks and only send relevant data to the workers.

For example, supposing I already have all alertWorkers up and running, and one of them has

state = %{
    code: "A",
    comparison: ">=", 
    message: "A is greater or equal 50",
    value: 50
}

When a new list comes from my websocket, I have to be able to send it only the item which “code” == “A” for it, and for the other workers, their respective “code”-based info.

Now my question is: What’s the best way to achieve this with Elixir?

I though about using a DynamicSupervisor to manage all alert-processes.
But I don`t know how to broadcast the infosList to every process, and make them validate it.

I need that the inner(or child) process to print the message when value hits its expectation.

Have you looked into the Registry to store the mapping of code -> worker? Dispatching code to the process in the Registry can be done with an Enum.each.

1 Like

A pattern I usually use when I need multiple instances of a process and I need them to be addressable individually is to use DynamicSupervisor along with global name registration.

Then create a small wrapper to interact with a given process that is responsible to either start it in case it’s not running or just call/cast/wtv when it’s already running.

It might look something like this:

# DynamicSupervisor

defmodule ProcessExample.Supervisor do
  use DynamicSupervisor

  require Logger
  
  def start_link() do
    Logger.info("::: ::: Starting #{__MODULE__} ::: :::")
    DynamicSupervisor.start_link(__MODULE__, nil, name: __MODULE__)
  end

  @spec start_child(String.t) :: {:ok, pid()} | {:error, any()}
  def start_child(id) do
    DynamicSupervisor.start_child(__MODULE__,
      %{
        id: {ProcessExample, id},
        start: {ProcessExample, :start_link, [id]},
        shutdown: 25_000,
        restart: :temporary,
        type: :worker
      }
    )

  end

  def init(_), do: DynamicSupervisor.init(strategy: :one_for_one)
end
defmodule ProcessExample do
  use GenServer

  @type code_id :: String.t
  @type process_name :: {:code_process, code_id}

  @inactivity_timeout 1000 * 60 * 15 #15min
   
  @spec process_name(String.t) :: process_name
  def process_name(id), do: {:code_process, id}

  @spec process_global_id(String.t) :: {:global, process_name}
  def process_global_id(id), do: {:global, process_name(id)}

  @spec whereis(String.t) :: :undefined | pid
  def whereis(id), do: :global.whereis_name(process_name(id))

  @spec interact(String.t, term) :: {:ok, Output.t} | {:error, Output.t}
  def interact(id, command \\ :start) do
    case whereis(id) do
      :undefined ->
        case __MODULE__.Supervisor.start_child(id) do
          {:ok, pid} -> GenServer.call(pid, command)
          error -> error
        end
      pid -> GenServer.call(pid, command)
    end
  end

  @spec start_link(non_neg_integer()) :: {:ok, pid()}
  def start_link(id) do
    GenServer.start_link(__MODULE__, id, name: process_global_id(id))
  end

  def init(id), do: {:ok, id, @inactivity_timeout}

  def handle_call(:start, _from, state), do: {:reply, {:ok, state}, state, @inactivity_timeout}
  def handle_call(:print, _from, state), do: {:reply, {:ok, state}, state, @inactivity_timeout}

  def handle_info(:timeout, state), do: {:stop, :normal, state}
  # other handles

end

Then add the DynamicSupervisor process to your app supervisor,

defmodule YourApp.Application do
  @moduledoc false

  use Application

  def start(_type, _args) do
    children = [
      # whatever
      Supervisor.child_spec(%{id: ProcessExample.Supervisor, start: {ProcessExample.Supervisor, :start_link, []}}, type: :supervisor)
    ]

    opts = [strategy: :one_for_one, name: YourApp.Supervisor]
    Supervisor.start_link(children, opts)
  end

(or start it manually for ProcessExample.Supervisor.start_link() just to check out on the shell)

And now the way you can use it is basically starting them:

["A", "B", "C"]
|> Enum.each(fn(code) -> ProcessExample.Supervisor.start_child(code) end)


[%{code: "A"}, %{code: "A"}, %{code: "B"}]
|> Enum.each(fn(%{code: code}) ->
  case ProcessExample.interact(code, :print) do
    {:ok, process_code} -> IO.inspect(process_code, label: "Print code response")
    error -> raise "Unexpected #{inspect error}, need to handle this properly and not with raising exceptions"
  end
end)

Very naive with usage of “each”, on the startup you probably want to know if any of them failed to start, and the same when calling, etc, it’s just to give you a gist of its structure.

There’s a timeout of 15 min for each process, so if it doesn’t receive any requests in 15min it shuts itself down (so you don’t have lingering process in case a code is no longer used but was previously started). Then whenever you need you just use ProcessExample.interact(code) to “talk” to it (in this example I made it only do calls…), even if it shut down in the meanwhile, it would be started again upon interact.

2 Likes

Where are you with this exactly? Have to figured out how to subscribe to your websocket server? There is websocketx to help with that.

Basically have a subscriber process which subscribes to your feed, and then routes appropriate messages to each process.

@amnu3387’s pattern is nice. Modify it to handle values and then you can do something like this in your subscriber process.

def handle_frame({_type, {code: code, value: value}}, state) do
    case ProcessExample.interact(code, value, :print) do
    {:ok, process_code} -> IO.inspect(process_code, label: "Print code response")
    error -> raise "Unexpected #{inspect error}, need to handle this properly and not with raising exceptions"
  end
 end

I like to argue against global registration for registering workers, especially if they’re subject to being spun up and down. Using a Registry can keep the mappings local to a DynamicSupervisor, and doesn’t pollute the global registry and won’t need to be replicated across nodes. Just my two cents though.

Actually I have´nt start the socket part yet. I`m trying to build the architecture first…

But thank you for the tip, I`ll give a closer look to websocketx

Sure, there’s drawbacks to all options, I’ve found that argument in a few places but as far as I understood, basically, it was only a problem when going more than 40 or 50 nodes connected? Other than that I also believe the intent is clearer when you use global name registry as you’re stating clearly that there can only be 1 process of such in any configuration, 1 or multi node - if the uniqueness is per node then Registry is the way to go ofc.
It all depends I guess.

Hello mnussbaumer, first of all, thanks for your response;

When using

should I be able to call something like YourApp.Supervisor.start_child(“A”) in te terminal?

Usually when you create a new project with a supervision tree (mix new your_app --sup) it will include an application file under /lib/your_app/application.ex with most of what I wrote there already filled in as the boilerplate. And when you start the application (if using phoenix with mix phx.server or iex -S mix phx.server) it will start the children processes you specify there.

If you want to start it manually then, after having those modules defined and compiled, available in your iex session, you would just call ProcessExample.Supervisor.start_link() that starts the dynamic supervisor we defined (and links it to the shell process in this case).

Okay, I understood it.

I’m not using phoenix, so I’m running through iex -S mix.

One thing I didnt get it in your code is the interact function. Why it exists if I can do Supervisor.start_child("A") ?

The other thing is about the state… As far as I’ve understand it, the worker’s state is a string, but it will need the other infos(not only the code) to make the validation mentioned… Do you have any idea about how I should make this change?

Hey, that was just an example of how to structure worker processes that can be addressed by an identifier

The state of the process is whatever you need it to be. Again I’m just taking shots, it’s just examples, but reading what you wrote, given this:

Let’s say you start the process with that state, now when you receive a list, for code A, you can do:

ProcessExample.interact("A", {:verify, 20})

And define handles for all your cases. (there’s probably some smarter way of using the comparison “string” to derive the check that has to be done between both values, but… the base idea would be this)

def handle_call({:verify, value}, _from, %{value: state_value, comparison: ">=", message: msg} = state) do
     case value >= state_value do
        true -> {:reply, {:ok, msg}, state, @inactivity_timeout}
        _ -> {:reply, {:error, msg}, state, @inactivity_timeout}
     end
end

And following this logic, then you probably don’t want the processes to exit on timeout, or you want to store the initial state definition somewhere that is accessible when the process starts in order to load its state (as it would be lost after the timeout scheme I wrote about before).