Process registry keyed to input data instead of a pid?

Hi all,

I’m struggling with how best to model something in OTP:

I have an input that sends a structure like [ [id, data], [id, data] ...]
data is some data that shows the “location” of the tag with id.

Each frame (about 20 fps) I get data from the detector showing multiple ids with their “locations”
Now, I want to maintain a set of programs that are running in response to this input.

For example, say on frame 1, I get [[1, [10]], [2, [20]]] (you can kind of ignore what’s in the data part it’s not so relevant.).
I would want to spawn two processes that are corresponding to these ids - so one process corresponding to id 1, and another process corresponding to id 2.

Then in the next frame I get more data - [[1, [11]], [2, [21]]].
Now I want to match this frame against the previous frame. If the “locations” are close by enough (some threshold) I will say that these are the “same” program. So in this case, I would want the programs to keep running with no change (the frame is a noop).

Let’s say another frame comes like [[1, [12]], [2, [22]], [3, [30]]
Now I would leave 1 and 2 running and start a new program corresponding to id 3.

Then if the next frame looks like [[2, [21]], [3, [29]]
I would stop the program corresponding to id 1, and keep the other two going.

My style of doing this is to try to maintain a map of %{location => pid} and then perform the necessary updates. I’m sort of stuck trying to figure out if I’m supposed to use a registry or if there’s some more efficient way to update the keys of the map given it’s happening 20 fps.

Other constraints:

  • for now you can assume that the number of ids in one frame will not exceed 1000.
  • there can be multiple of the same id, which is the whole reason I need this concept

hope that’s clear enough.

If it’s helpful, here’s how I am “diffing” frames:

  @type container(T) :: list(T) | MapSet.t(T)
  @spec(container(T), container(T), (T, T) -> number, number, (T) -> any) :: %{new: [T], same: [{T,T}], old: [T]} when T :: term
  def diff(new, old, distance, threshold \\ 80, group_id \\ fn x -> x end)
  do
    oldgroup = old |> Enum.group_by(group_id)
    newgroup = new |> Enum.group_by(group_id)
    newgroup
    |> Enum.reduce(%{new: [], same: [], old: []}, fn {id, values}, r ->
    if not Map.has_key?(oldgroup, id) do
      %{r  | new: r.new ++ values}
    else
        # Naive implementation O(N^2) and can have duplicate matches
        Enum.reduce(values, r, fn value, acc ->
          distances = oldgroup[id] |> Enum.map(fn x -> distance.(x, value) end)
          mind = Enum.min(distances)
          mini = Enum.min_by(distances, &(&1))
          if mind < threshold do
            %{acc | same: [{value, Enum.at(oldgroup[id], mini)} | r.same]}
          else
            %{acc | new: [value | r.new]}
          end
        end)

    end end)
    |> (fn r -> Enum.reduce(oldgroup, r, fn {id, values}, r ->
      if not Map.has_key?(newgroup, id)
      do Map.put(r, :old, r.old ++ values)
      else r end end)
    end).()
  end

Maybe it would be easier to use ETS instead spawning separate processes for each of the IDs?

How would I do that? Can I use the data list as a key?

Just check the specs.

Something like that?

def add_with_threshold(tab, id, values, threshold) do
  new_min = Enum.min(values)

  case :ets.lookup(tab, id) do
    [{id, old_min, old_values}] when abs(new_min - old_min) > threshold ->
      :ets.insert(tab, {id, new_min, old_values ++ new_values})
    [] ->
      :ets.insert(tab, {id, new_min, values})
    _ ->
      :ok
  end
end
1 Like

I need something more like:

def handle_frame(newframe, oldframe, table) do 
  d = diff(newframe, oldframe, fn x,y -> ... end)
  d.new |> fn x -> table.put(x, DynamicSupervisor.start_child(...))
  d.old |> fn x -> Process.kill(table.get(x)) table.remove(x) end 
  d.same |> fn {x,old} -> table.put(x, table.get(old))
end

is ets the right structure for this and is it able to handle 20fps? I also don’t know the right OTP structure - how do I programmatically create/destroy the program?