"Realtime" GenStage design?

Hi,

I’m new to elixir - have read Elixir in Action and the Designing Systems with OTP books.
Looking to use the BEAM and all it’s goodness for a project which isn’t a server with a request/response format (which is what the books talk most about, understandably).

I have some sensors (ARTag detector, rain sensor, weather sensor, pocketlab, w/e it doesn’t matter) that generate data, and I want to build an “environment” that allows those sensor inputs to be combined and sent to an output. Each detector spits out information per frame, at around 15FPS or so.
I also have some outputs - for now, a screen and a speaker (although i also want to add some kind of persistence like a database or filesystem).

One of these detectors is linked as a “program detector” whose job it is to identify which program ids should be running in the space. So every frame, one UDP port gets packets with info about which programs are “active”. Maybe it looks like: [[id, x-position, y-position], [id, x-position, y-position]...] if the program sensor is an ARTag detector.

Now I want to dynamically start the programs I see and run them for as long as I see them (in other words if I don’t see the program for 3-5 frames I will quit it, if it comes back i will restart it).

Each program will take in the sensor input and spit out some data to send to an output process.

This feels like a dynamic version of the producer/consumer model that GenStage allows for, with a couple differences:

  1. I want a “real time” Gen Stage - every stage holds onto the most recent value and when its consumer requests the next data point it gets the most recent data available (so if there’s one stage running at 2fps for some reason it will not pile up events, just get whatever is latest). i’m not interested in piping all the data through the system. the outputs should have as recent data as possible, and all messages that could not be processed are dropped. It’s like UDP in the UDP/TCP relationship I guess.
  2. there is a many:many relationship between inputs:programs. I’m not clear how this works with genstage. Messages have to be processed 1 at a time, and say the “program” (basically a function) needs input from more than 1 sensor in order to form its output. In the reactive world this would be accomplished by combine_latest or similar functionality
  3. there is a many:many relationship between programs:outputs - each output will receive commands from many programs, how do they maintain the overall state? In this case, it may be possible to structure the output as a series of patches that can be applied.
  4. how does the output clean up state if the program is removed? can i send some special message from the supervisor that “cleans up” anything drawn to the screen, for example?

If elixir/erlang are the wrong tool for this kind of work then please let me know that too and what might be the right tool. Prior to reading about erlang, my main exposure to “concurrency” was as reactivex streams in frontend, which are totally different. I’m probably thinking about this backwards or something.

thank you! happy to provide any other information, samples of what inputs/programs/outputs could look like if it helps.

2 Likes

Hi,

I am not sure I would use genstate for this.

I would use a Registry in a pubsub fashion: “program” processes register on the registry with each sensor name they want to listen to.

UDP messages from regular sensor are transformed into sensor data and dispatched to the registry, i.e. their data is sent to all programs listening to that particular sensor.

UDP messages from the “program detector” are handled separately by a process (maybe the listener process itself) that manages a state and decide if it should start/stop a program process. Note that you can just start a program blindly, and the programs supervisor may reply “already started”, which is fine. The only important logic should be program stopping decision.

Programs processes that are stopped are automatically unregistered from the registry.

You can do the same to publish program data to outputs listening to program names or identifiers.

EDIT
But all of this could also be simply managed by simple data structures:

  • a map %{program_name => active?}
  • a map %{program_name => [sensor_name]}
  • a map %{output_name => [program_name]}
  • a map %{sensor_name => last_state}
  • etc.

So when you receive a sensor data you can update the whole state. And if you want to use genstage, then all of this is in the same producer_consumer, with maybe the UDP acceptor(s) as producer(s) and output(s) as consumer(s).

You producer_consumer can then output events tagged with the program name, use a BroadcastDispatched so the output consumers can use the selector option of this dispatcher to only get messages for the programs they are interested in.

1 Like

Higher-level tools like GenStage can be really helpful if you’re building a system that matches the expectations encoded in the tool.

In a case like this, I’d recommend at least prototyping with GenServer - you’ll have to write more functions, but you’ll also be able to see what’s going on.

Some things to think about:

  • where does history live in the system? Is there ever a need for “get me the value from that sensor three frames ago”?

  • how expensive are the calculations being done on the sensor data? Is doing them at request-time a reasonable option?

  • how much concurrency is needed? 15FPS isn’t that often.


The simplest possible thing you could build is a single GenServer that handles maintaining sensor state, with messages like:

  • set_value - called from the network listener to store a sensor’s data
  • get_values - called from UI code. Fetches a map (or something) of sensor ID → current value

Pros:

  • consistent view of the whole system state - get_values is an atomic operation since no set_value can be processed while it is running
  • consumers are plain functions of the state; easy to test etc
  • no process bookkeeping

Cons:

  • single-threaded
  • inefficient if there are many sources but most consumers only read a few

If the simple solution doesn’t suffice, the next step depends on the problem.

If there are many sources but consumers mostly read only nonoverlapping subsets, you could start multiple instances of the “simple approach” GenServer for the different “regions” of sensors. This would weaken the “consistent state” guarantee to only apply to a single region, though.

If concurrency is critical, you could fully split the GenServer down to one process per sensor. This provides the weakest consistency guarantees, and means you’ll need to use some kind of message dispatching (Registry is a good read). Since it sounds like your application involves dynamically-attached sensors, you’ll also need to do some supervisor juggling to get them started/stopped.


I suspect the biggest challenge you’ll face is with this requirement:

the outputs should have as recent data as possible, and all messages that could not be processed are dropped

That’s strongly opposite of the outlook of the BEAM, and will require manual handling. For instance, if you had a system where a sensor’s GenServer broadcasted messages to each consumer they would queue up in a slow consumer’s mailbox.

2 Likes

That’s strongly opposite of the outlook of the BEAM

Nononono. It is very possible to do this in the BEAM. In fact I would say it’s easier to do this in the BEAM than any other platform.

@vshesh what you are trying to do is easy in the BEAM. The advice to use GenServers/Registry is good advice. However, note that using a Registry in the PubSub mode is nonobvious, and the documentation is a bit squirrely IMO. I suggest importing Phoenix.PubSub as a dependency and using that. (you don’t have to use phoenix to use phoenix’s pubsub). It’s easier, and it drop-in works across nodes in a cluster if you start doing that.

the erlang gen_udp (and gen_tcp and ssl) libraries are really fantastic: If you want to see a really canonical way of setting up udp listening servers, I recommend checking out the code I wrote here, I tried to be VERY clean about all of it (documentation </> in the upper right takes you to the code) ExDhcp — ex_dhcp v0.1.5

note that the DHCP server uses the technique of converting udp packets directly to BEAM messages (it’s an active: true connection. You can also use active: false which instead polls your socket, an example of a simple server that does this is: trivial v0.1.0 — Documentation but note that it’s a tcp connection, not a UDP connection, so there is an extra indirection in the form of the acceptor.

And if you’re new to the BEAM one last thing, you don’t have to worry about resource cleanup. Don’t close your sockets, etc… the gen_udp service will close the socket for you when your process dies (just like an OS recovers those resources). Also goes for file descriptors, etc.

To be clear, I agree 100% about the UDP handling - my point was specifically about the dropping messages part of the requirement. Whole lot of BEAM engineering effort that’s been put into making sure messages don’t go missing from mailboxes :slight_smile:

1 Like

Hi everyone, thank you for all your help.

Answers to questions

  1. history? - not at the system level. A program can hold onto previous frames if it wants to in its process
  2. number crunching? unclear. so far the calculations are relatively cheap. the heavy number crunching is happening prior to the data being sent to the environment I’m writing in elixir; there shouldn’t be too much to do within the environment, hopefully.
  3. concurrency arises from not every program needing to be run every frame based on which inputs it is requiring. Reducing lag is another issue within the system - I want the outputs to be as close to real time to the inputs as possible. We’ll see how well the BEAM does for that; I know this isn’t an explicit design goal of the BEAM.

I started with the advice to prototype something simple. Right now I’m trying to get a basic version written where inputs are stored in one GenServer and Outputs are dispatched to one GenServer. I’m getting stuck on something within each component. Here are my sketches, open to any refinements:

Input Side:

“worker” process that listens to UDP port and sends data to the big GenServer for input state:

# missing a supervisor that will manage/restart these listeners
defmodule InputListener do
  use GenServer
  import OSC

  def start_link(state_pid, port, udp_opts \\ nil) do
    GenServer.start_link(__MODULE__, {state_pid, port, udp_opts})
  end

  @impl GenServer
  def init({state_pid, port, udp_opts}) do
    {:ok, socket} = if is_nil(udp_opts) do :gen_udp.open(port) else :gen_udp.open(port, udp_opts) end
    {:ok, {socket, state_pid}}
  end

  @impl GenServer
  def handle_info({:udp, _socket, _ip, _port, packet}, {__socket, state_pid} = state) do
    send(state_pid, decode(packet))
    {:noreply, state}
  end

end

GenServer that holds all inputs (one of the ideas presented in the messages above, I will also try the Registry implementation later):

defmodule InputState do
  use GenServer

  defstruct inputs: %{}, states: %{}

  def start_link do
    GenServer.start_link(__MODULE__, nil)
  end

  @impl GenServer
  def init(_) do
    {:ok, %__MODULE__{}}
  end

  def get do
    GenServer.call(__MODULE__, {:get})
  end

  def add(name, port) do
    GenServer.call(__MODULE__, {:add, name, port})
  end

  @impl GenServer
  def handle_call({:update, name, data}, _, %__MODULE__{inputs: inputs, states: states} = state) do
    {:reply, :ok, %__MODULE__{ state |
      states: Map.put(states, name, data)
    }}
  end

  @impl GenServer
  def handle_call({:add, name, port}, _, %__MODULE__{inputs: inputs, states: states}) do
    {:reply, :ok, %__MODULE__{ states |
      inputs: Map.put(inputs, name,  ???????? (which supervisor do i use to start the worker process))
    }}
  end

  def handle_call({:get}, _, data) do
    {:reply, data.states, data}
  end
end

Main program

defmodule Program do
  use GenServer
  @moduledoc """
  Behavior for Programs running in the environment
  Must hook up to inputs and outputs somewhere, not sure where
  """
  @input_state_pid nil

  def start_link(input_state_pid) do
    @input_state_pid = input_state_pid
    GenServer.start_link(__MODULE__, nil)
  end

  @doc """
  Initialize the state of this program, if there is one.
  """
  def init(_term) do
    GenServer.cast(@input_state_pid, :get)
    {:ok, __MODULE__.starting_state()}
  end

  @callback starting_state :: any

  @doc """
  Takes the new input data and computes the new state.
  """
  @callback handle_frame(term, term) :: any
  @callback emit(state) :: %{atom | pid => t}

  @impl GenServer
  def handle_cast({:newdata, data}, state) do
    GenServer.cast(@input_state_pid, :get)
    newstate = __MODULE__.handle_frame(state, data)
    dispatch_outputs(emit(newstate))
    {:noreply, newstate}
  end
  defp dispatch_outputs(messages) do
    Enum.map(messages, fn {k, v} -> GenServer.cast(:output_registry, {k, v}))
  end 

  @impl GenServer
  def handle_call(:get, _, state) do
    {:reply, state, state}
  end
end

In general the actor model is supposed to be a ‘push’ model where each component sends a message when it wants something. In order to avoid messages getting piled up, I designed this system to be a pull system where each component pulls data from the previous, and then performs the update to an internal state. Sort of like a flip flop, I guess. I don’t like this though because I’m generating lots of idle work and :get messages when the state hasn’t changed. Instead of sending too many message with new state (which would be a lot more memory), I’m sending too many messages requesting state when there’s no action to take.

The output side is a registry set up as pub sub with processes/GenServers listening behind and doing arbitrary things - it’s the opposite of the input side.

I have a lot of casts here as well, which I’m guessing is not ideal. Open to any advice on how to improve my design. I will try to use a registry for inputs as well because many programs will not want all inputs and it makes no sense to generate so many messages.

1 Like

It feels like you are over engineering this. Seriously, use the Phoenix pubsub and set up subscriptions using that. Don’t write so many GenServers.

Also beam is meant to be “soft realtime” so it will probably do what you want.

Finally, don’t take the actor model paradigm so seriously. It’s a terrible framework for organizing you thinking about how your code works:. There is a warning about this in the docs:

Actors can be a confusing unit of concurrency, and it’s better to think of BEAM actors as failure domain units. It can take a while to internalize that (and it’s not anything carl hewitt talks about) so try to use as few of them as possible early on and in the beginning try as much as possible to let the more experienced library writers provide as many of the GenServers as possible for you to drop your code into.

1 Like

Quick suggestion: if you publish messages at 15 FPS from one process but consume at 2 FPS in another process, you can just skip the extra messages. This can be done with a recursive function calling receive with an after 0 statement. If the messages aren’t too big this may be good enough.

Wht does phoenix pubsub gives than Registry doesn’t in this context ?

which supervisor do i use to start the worker process

@vshesh You start all your processes under the same supervisor and you use name registration so each name is known beforehand.

But honestly I would start with a single process. Code everything in the UDP acceptor. I bet it will handle 15FPS easily. That will let you write all the functions you need for the logic without thinking too much about processes, to keep things functional.

If it doesn’t handle 15fps, then your model would work: an UDP acceptor/listener process that just registers all inputs in a map (and overwriting data from the same input so you just keep fresh data), and a process that will pull data from the acceptor, run the programs and send output.

If still too slow, then you can separate output handling in its own process, or have a process for each program and/or dispatch the inputs with a registry.

But concurrency should be added when needed, so I would just write a purely functional algorithm first based on a function that keeps a state, accepts an UDP message, and sends output.

Wht does phoenix pubsub gives than Registry doesn’t in this context ?

It’s brain-dead simple. 4 lines of code (“follow the directions”) to get setup, subscription, and the publishing.

It’s not easy to set up registry pubsub correctly.

2 Likes

It’s not easy to set up registry pubsub correctly

I strongly disagree. But I can see simplicity as a feature of phoenix pubsub, alright.

I would like to understand more about what this means. I was trying to write out what the pubsub based system would look like and realized that skipping messages would be necessary in order to make that work.
How does after: 0 help me skip messages - wouldn’t I still be processing every message in order? If I have a handle_call for the information I’ve subscribed to, and it does a slow calculation, I will have say 13 messages in the mailbox (for the 15 vs 2fps example) . I want to only take the latest message and delete the rest if they’re on the same “topic” (here input name), else I will end up with greater and greater lag over time.

Somewhere I need to “reduce” the messages - in reactive streams land I would transform a Signal into a Behavior and sample it. I think “replacing” messages with new ones is what I’m trying to do - I couldn’t think of any other way to do it.

Is the suggestion is to add a server like this:

defmodule MessageFlatten do 
  @current = nil 
  def start(topic) do
    PubSub.subscribe :inputs, topic 
    loop()
  end 

  def loop do 
    receive do msg
      if {:get, sender} === msg do 
       send(sender, @current) 
      else 
        @current = msg 
      end
    end 
    loop()
  end 
end 

I think this is the core of what I wrote above in the state accumulator and if I’m going to do that then why pubsub?

You can skip ahead by using selective receive. Code untested, but you will get the idea:

def handle_info({:reading, reading}, state) do
    reading
    |> get_latest_reading()
    |> handle_reading()
end

defp get_latest_reading(old_reading) do
    receive do
        {:reading, new_reading} ->
            get_latest_reading(new_reading)
    after
        0 ->
            old_reading
    end
end
1 Like

Again, thank you for all the support.

Alright, here’s me trying it out with the PubSub. I agree, this makes handling the registration easier and saves one source of complication within the system. And it is true that “output” could be on a different device. It would be nice to be able to send the message directly without worrying about how that is set up.

First, the application layout. Outputs are separate programs that will be run and subscribe to the PubSub here themselves, no dependency on this application. Programs will broadcast to the appropriate output the message they want to send. So what’s left in the application is:

defmodule Tui.Application do
  use Application

  @impl Application
  def start(_type, _args) do
    Supervisor.start_link([
      Supervisor.child_spec({Phoenix.PubSub, name: :inputs}, id: :pubsub_input),
      Supervisor.child_spec({Phoenix.PubSub, name: :outputs}, id: :pubsub_output),
      Tui.InputSupervisor,
      {DynamicSupervisor, strategy: :one_for_one, name: Tui.ProgramSupervisor},
      Plug.Adapters.Cowboy.child_spec(scheme: :http, plug: Tui.Herald, options: [port: 3000])
    ], strategy: :one_for_one)
  end
end

A “Herald” which receives information about inputs/outputs being registered to the system (so I can leave the environment running and not turn it off/on just to add a sensor)

defmodule Tui.Herald do
  use Plug.Router
  use Plug.Debugger
  require Logger
  plug(Plug.Logger, log: :debug)
  plug(:match)
  plug(:dispatch)

  get "/add-input" do
    %{"port" => port, "name" => name} = fetch_query_params(conn).params
    Tui.InputSupervisor.start_child(name, String.to_integer(port))
    send_resp(conn, 201, Jason.encode!(%{status: :initialized, name: name, port: String.to_integer(port)}))
  end

  match _ do
    send_resp(conn, 404, "")
  end
end

InputSupervisor and Listener (I could not figure out how to have only one process with UDP sockets open for all ports - is it not better to have one process per UDP socket? or is that what is already happening with :gen_udp? The other option is to manage a map of port => name and then handle messages based on which port they come in on - that seemed more complicated.) Is there a way to directly tell the :gen_udp to talk to the pubsub rather than having this intermediary process?

defmodule Tui.InputSupervisor do
  use DynamicSupervisor, restart: :transient

  def start_link(init_arg) do
    DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  def start_child(name, port, udp_opts \\ nil)
  when is_binary(name) and is_integer(port) do
    DynamicSupervisor.start_child(__MODULE__, {Tui.InputListener, {name, port, udp_opts}})
  end

  @impl true
  def init(_init_arg) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end
end

defmodule Tui.InputListener do
  use GenServer
  import OSC
  alias Phoenix.PubSub

  def start_link({name, port, udp_opts})
  when is_binary(name) and is_integer(port) do
    GenServer.start_link(__MODULE__, {name, port, udp_opts})
  end

  @impl GenServer
  @spec init({String.t, integer, keyword}) :: {:ok, {String.t}}
  def init({name, port, nil})
  when is_binary(name) and is_integer(port) do
    {:ok, _socket} = :gen_udp.open(port)
    {:ok, {name}}
  end

  def init({name, port, udp_opts})
  when is_binary(name) and is_integer(port) do
    {:ok, _socket} = :gen_udp.open(port, udp_opts)
    {:ok, {name}}
  end

  @impl GenServer
  def handle_info({:udp, _socket, _ip, _port, packet}, {name} = state) do
    :ok = PubSub.broadcast(:inputs, name, List.first(decode!(packet).contents))
    {:noreply, state}
  end
end

I have tested the Input side of things and as far as I can tell it works. I can add a sensor and then send data to the program and have that received by a process listening to the pubsub.
The more complicated part of this is the Program part:

Program behavior

defmodule Program do
  use GenServer

  @callback inputs :: [atom]
  @callback init(term) :: {:ok, {map, any}}
  @callback handle_data(any, map, map) :: {:ok, term}
  @callback emit(term) :: map

  @impl GenServer
  def init(arg) do
    __MODULE__.inputs()
    |> Enum.each fn x -> Phoenix.PubSub.subscribe :inputs, x end

    __MODULE__.init(arg)
  end

  def handle_call(%OSC.Message{address: address, arguments: arguments}, _, {inputs, state}) do
    data = get_latest_reading(address, arguments)
    newinputs = %{inputs | address => data}
    {toemit, newstate} = __MODULE__.handle_data(state, inputs, newinputs)
    __MODULE__.emit(toemit)
    {:noreply, {newinputs, newstate}}
  end

  def get_latest_reading(address, arguments) do
    receive do
      %OSC.Message{address: ^address, arguments: args} ->
        get_latest_reading(address, args)
    after 0 -> arguments
    end
  end
end

MetaProgram - a program that listens to the :programs inputs and starts/stops programs. It would have to match the programs in the previous frame against the ones in this frame in order to know which program should be stopped.

... TBD ... still figuring this one out.

Any thoughts - still overengineered or am I on the right track? I’m also struggling to understand how to use the behavior’s functions in the Program class - what do I do in the handle_call function if I want to call handle_data? I know I’m using a subclass style similar to python or java and that’s likely the wrong way to go about it, please help!