Sending messages between nodes

I am trying to send messages to all local nodes periodically about a leader selection. I am using 4 nodes currently

[:"n1@127.0.0.1", :"n2@127.0.0.1", :"n3@127.0.0.1", :"n4@127.0.0.1"]

Leader module

defmodule Worm.Leader do
  def leader_election() do
    nodes =
      ([Node.self()] ++ Node.list())

    max = Enum.max(nodes)

    Enum.each(nodes, fn node -> send_leader_info(node, max) end)
  end

  def send_leader_info(node, max) do
    Process.send_after(node, {:leader, max}, 200)
  end
end

Genserver

@impl GenServer
  def handle_info({:leader, max_node}, state) do
    Logger.info("--- New Leader is: #{state}")

    # Reschedule once more
    Worm.Leader.leader_election()

    {:noreply, state}
  end

I get this result

send: #Reference<0.2477235930.2055208966.145218>
send: #Reference<0.2477235930.2055208966.145221>
send: #Reference<0.2477235930.2055208966.145224>
send: #Reference<0.2477235930.2055208966.145227>

But I am not able to see any periodic message on any connected node. What am I doing wrong here?

Thanks

You are sending messages, but you have never specified to which process you want to send them. This mean, that it treats it as an process names in local registry, so in the end these messages goes nowhere.

3 Likes

Process.send_after/3 needs a pid or a process name as the first argument, but you gave it a node name.

Here’s part the documentation of Process.send_after/3:

If dest (the first arg) is a PID, it must be the PID of a local process, dead or alive. If dest
is an atom, it must be the name of a registered PROCESS which is looked up at
the time of delivery. No error is produced if the name does not refer to a
process.

1 Like

How can i specify processes on a node, as I need the messages to display on other nodes like a heartbeat.

1 Like

You need to either use named processes or send PIDs of the quorum members by other way.

Kernel.send/2 can take {process_name, node_name} as the first arg, where process_name is an atom. You have to workaround it in order to delay the sending, for example, send to self() a message after 200ms, and when that message is consumed, use Kernel.send/2 to send a message to the remote processes.

1 Like

I have modified the function send_leader_info\2 by spawning the node but still am getting nothing on remote nodes.

def send_leader_info(node, max) do
    node_pid = spawn(fn -> node end)

    Process.send_after(node_pid, {:leader, max}, 200)
    |> IO.inspect(label: :send)
end

Assuming you want to receive the messages in your GenServer process, you can start the genserver with a name and send messages to {genserve_name, node_name}.

  def start_link(opts) do
    # starting genserver with a name
    GenServer.start_link(__MODULE__, opts, name: SomeName)
  end

  @impl GenServer
  def handle_info({:leader, max_node}, state) do
    Logger.info("--- New Leader is: #{state}")

    # Reschedule once more
    Worm.Leader.leader_election()

    {:noreply, state}
  end
defmodule Worm.Leader do
  def leader_election() do
    nodes =
      ([Node.self()] ++ Node.list())

    max = Enum.max(nodes)

    Enum.each(nodes, fn node -> send_leader_info(node, max) end)
  end

  def send_leader_info(node, max) do
    # sending to {SomeName, node} instead of node
    Process.send_after({SomeName, node}, {:leader, max}, 200)
  end
end

See GenServer — Elixir v1.12.3 for more info

1 Like

Just a small tip - you can do

nodes = [Node.self() | Node.list()]

instead of:

nodes = ([Node.self()] ++ Node.list())
1 Like

getting an argument error

2nd argument: not a pid or an atom
:erlang.send_after(200, {SomeName, :"n1@127.0.0.1"}, {:leader, :"n1@127.0.0.1"})

So {SomeName, :"n1@127.0.0.1"} should be a pid or an atom :frowning:

Try with send({SomeName, :"n1@127.0.0.1"}, {:leader, :"n1@127.0.0.1"}), does it work? :erlang.send_after indeed doesn’t allow sending to remote nodes.

1 Like

I recommend you take a look at the erlang Erlang -- pg module. It sounds like it’d fit this use case well, getting registered processes across nodes to send them messages.

2 Likes

At the most basic explanation, pg2 allows for a group to be created and then for processes to connect to the group. This leads to a mapping of name -> pid list. The pid list consists of all known processes, whether they be local or remote.

This is exactly what I needed.

1 Like

If you’re on Erlang 23 or newer the pg module is what should be used. pg2 is deprecated and will be removed soon.

2 Likes