"sample" an agent every second?

I have a situation where I have a pubsub, and a process receiving events from that pubsub.
I want the receiving process to update some state. That’s pretty easy with a genserver or an agent.
Then, I want another process to “sample” the process with state every second and do something.

What is the best way to accomplish this? Agent + Task?

Does that change if I want to modify the schedule at which the sampling is happening over time? (so the sampler can receive an event that changes the frequency of sampling?)

In Rx I would use a behavior and sample it with rx.sample - anything like that here welcome.

And what is the best way to expose this to the rest of my app? Should I make a mini-supervisor tree with both of these processes since they’re linked (if the receiving process goes down, no sense having the sampling process). How do I make it possible for the supervision tree to treat both of these processess together as a “single process” that can be shut down together?

You probably want a more complex scheduler for this. Eg. Oban or Quantum.

If you don’t want something so heavyweight, you can just create a sampling genserver and in the init use :timer.send_interval/2 Erlang -- timer

How do I make it possible for the supervision tree to treat both of these processess together as a “single process” that can be shut down together?

change the strategy from :one_for_one to :one_for_all. Or maybe :rest_for_all.

1 Like

Simply pass the rate frequency as part of the server state. So You can modify it.

There is also Process.send_after.

There is an example in this tetris in Erlang, where the game loop accelarate over time.

http://www1.erlang.org/examples/small_examples/tetris.erl

1 Like

I have a feeling process.send_after has some corner cases where you don’t want to use it for periodic events, since it doesn’t automatically cancel a previous firing cycle. If you’re not careful with your code you can wind up with 2x firings, 3x firings, 4x firings and so forth.

You can store the reference to your timer when you use send_interval, and that will allow you to make adjustments later, using :timer.cancel and storing a new timer ref when you rebuild the interval.

This is what I went with:

  @doc """
  :param tempo: integer, beats per minute (# of times chord plays per minute)
  """
  def init(tempo) when is_integer(tempo) do
    Phoenix.PubSub.subscribe(:inputs, "dots")
    {:ok, {nil, generate_tick(tempo}}
  end
  def handle_cast({:tempo, tempo}, {chord, tref}) do
    :timer.cancel(tref)
    {:noreply, {chord, generate_tick(tempo)}}
  end
  defp generate_tick(tempo) do
    {:ok, ref} = :timer.apply_interval(trunc(60000/tempo), __MODULE__, :play, [self])
    ref
  end

Since you are already using a PubSub, why don’t you just broadcast from the first GenServer every second using a different topic and let whatever interested party subscribe to it.

If you have to do polling, I’d skip GenServer, timer, sending yourself a message, etc, in the poller altogether, and just spawn off a plain process that sleeps periodically.

Not sure how to do that - the intention to do this resulted in timer, genserver, message etc.
I receive a message, then update some internal state. the “poller” or “sampler” is acting on that updated state. I’m just using one module now and I spawn the timer from there directly. Could you show how you would make it simpler?

It is also the case for Process.send_after, it returns a ref. And it is possible to read the elapsed time with Process.read_timer(ref), or cancel timer. I also store the ref in the server state.

Both are doing the same, but not the same way. I don’t use :timer module because it can get overloaded.

From Erlang -- Common Caveats

Creating timers using erlang:send_after/3 and erlang:start_timer/3, is much more efficient than using the timers provided by the timer module in STDLIB.

1 Like

It’s more common to use a map than a tuple for state…

Then it is easier to make pattern match…

%{chord: chord, tref: tref}
#
# You can pattern match nil tref with...
%{tref: nil}
# instead of 
{_, nil}
# and potentially, it's harder to remember where tref is.
{_, _, _, nil, _, _}

It’s also possible to do it with tuples, but it will be easier to pass from map to struct, than from tuple to struct. Unless You need to interface some Erlang record type, the Elixir way is to use a map.

The idea is to write a functional core, separate from any server logic.

BTW You can also replace 60000 with 60_000, it might be more readable.

Just send yourself a message using send_after then from the handler broadcast your state:

 def init(_) do
    # compute init state
    Process.send_after(self(), :tick, 1000)
    {:ok, state}
  end

def handle_info(:tick, state) do
    Phoenix.PubSub.broadcast(My.PubSub, "sample", {:sample, state})
    Process.send_after(self(), :tick, 1000)
end
1 Like

I would even go further…

  def init(_) do
    # compute init state
    period = 1_000
    ref = Process.send_after(self(), :tick, period)
    {:ok, %{period: period, tref: ref}}
  end

You can pause, resume, update the frequency rate…

If you need to go fancy with timers, updating timers, postpone events, etc, you can checkout :gen_statem in erlang. It is basically a GenServer with lots of bells and whistles.

A state machine is also a good solution.