Elixir send/2 is sending different messages to processes

I have 3 processes labeled p0…p2. I want p0 to send to all processes(including itself) the message {:prepare, data_msg }. p1 and p2 receive the message and compute the relevant steps. p0 shoots out the following error:

** (ArgumentError) errors were found at the given arguments:

  * 1st argument: not an atom

    :erlang.apply([prepare: {:msg, 1, 32, #PID<0.117.0>}, prepare: {:msg, 1, 32, #PID<0.117.0>}, prepare: {:msg, 1, 32, #PID<0.117.0>}], :a_bal, [])  
    algorithm.ex:51: Algorithm.run/1

I’m very unsure as to why this is happenig to only p0 as p0 is essentially running the same code as p1 and p2.

Here are the relevant code snippets:

defmodule Algorithm do
    def start(name, participants, client) do
        pid = spawn(Algorithm, :init, [name, participants, client])
        # :global.unregister_name(name)
        case :global.re_register_name(name, pid) do
            :yes -> pid  
            :no  -> :error
        end
        IO.puts "registered #{name}"
        pid
    end

     def init(name, processes, client) do 
        Process.sleep(10)
        state = %{ 
            name: name, 
            processes: processes,
            client: (if is_pid(client), do: client, else: self()),

            bal: -1, 
            v: 0,

            a_bal: nil,
            a_val: nil, 

        }
        run(state)
    end

    def run(state) do
        state = receive do
            {:propose, val} ->
                b = 1
                data_msg = {:msg, b, val, self()}
                IO.puts(":propose | #{inspect state.name} | data_msg: #{inspect data_msg}")
                IO.puts(":propose | #{inspect state.name} | state: #{inspect state}")
                beb_broadcast({:prepare, data_msg}, state.processes)

            {:prepare, data_msg} ->
                {:msg, b, v, p} = data_msg
                # update state with b
                #{state | bal: b}
                IO.puts(":prepare | #{inspect state.name} | data_msg: #{inspect data_msg}")
                send(p, {:prepared, b, state.a_bal, state.a_val, data_msg})

            {:prepared, b, a_bal, a_val, data_msg} ->
                IO.puts(":prepared | #{inspect state.name} | b: #{inspect b}")
        end
        run(state)
    end
    
    defp unicast(m, p) do
        case :global.whereis_name(p) do
                pid when is_pid(pid) -> send(pid, m)
                :undefined -> :ok
        end
    end

    defp beb_broadcast(m, dest), do: for p <- dest, do: unicast(m, p)
    
end

This are the lines I ran in IEX to reproudce the error

c "algorithm.ex"
procs=[:p0, :p1, :p2]
pids = Enum.map(procs, fn(p) -> Algorithm.start(p, procs, self()) end)
[ x, y, z ] = pids
send(x, {:propose, 32})

Note: state.processes is a list containing all process names

1 Like

Please provide full listing of this file.

This error is caused by writing something.name where something is a list of tuples that each look like the message passed to beb_broadcast.

The stacktrace suggests it’s on line 60 of algorithm.ex, but it’s impossible to diagnose further without more code…


    def start(name, participants, client) do

        pid = spawn(Paxos, :init, [name, participants, client])

        # :global.unregister_name(name)

        case :global.re_register_name(name, pid) do

            :yes -> pid  

            :no  -> :error

        end

        IO.puts "registered #{name}"

        pid

    end

     def init(name, processes, client) do

        Process.sleep(10)

        state = %{

            name: name,

            processes: processes,

            client: (if is_pid(client), do: client, else: self()),

            bal: -1,

            v: 0,

            a_bal: nil,

            a_val: nil,

        }

        run(state)

    end

    def run(state) do

        state = receive do

            {:propose, val} ->

                b = 1

                data_msg = {:msg, b, val, self()}

                IO.puts(":propose | #{inspect state.name} | data_msg: #{inspect data_msg}")

                beb_broadcast({:prepare, data_msg}, state.processes)

            {:prepare, data_msg} ->

                {:msg, b, v, p} = data_msg

                # update state with b

                %{state | bal: b}

                IO.puts(":prepare | #{inspect state.name} | data_msg: #{inspect data_msg}")

                send(p, {:prepared, b, state.a_bal, state.a_val, data_msg})

              

            {:prepared, b, a_bal, a_val, data_msg} ->

                IO.puts(":prepared | #{inspect state.name} | b: #{inspect b}")

        end

        run(state)

    end

    defp unicast(m, p) do

        case :global.whereis_name(p) do

                pid when is_pid(pid) -> send(pid, m)

                :undefined -> :ok

        end

    end

    defp beb_broadcast(m, dest), do: for p <- dest, do: unicast(m, p)

end
IO.puts(":propose | #{inspect state.name} | data_msg: #{inspect data_msg}")

Is this line 60? Try putting an IO.inspect(state) right before this line. I think somewhere in your code, the state is getting re-bound to a list of messages, so when you try to access it as a map, you receive this error.

1 Like

This is line 60
IO.puts(":prepare | #{inspect state.name} | data_msg: #{inspect data_msg}")

Putting IO.inspect(state) before line 60 returns the same error

Right, it doesn’t fix the error, but right before the error, it will show you what is in the variable state - and I don’t think it is what you think it is.

This is the output state output from p1 and p2
p1:

:prepare | :p2 | state: %{a_bal: -1, a_val: nil, bal: -1, client: #PID<0.106.0>, name: :p2, processes: [:p0, :p1, :p2],  v: 0}

p2:

:prepare | :p1 | state: %{a_bal: -1, a_val: nil, bal: -1, client: #PID<0.106.0>, name: :p1, processes: [:p0, :p1, :p2],  v: 0}

p0 outputs nothing and has the same error as shown previously

If p0 is not outputting anything with IO.inspect, then you’re putting the inspect in the wrong spot. It needs to come before the error line.

p0 fails at the start of the {:prepare, data_msg} -> block.

The best console output of state i can get is before p0 beb_broadcast’s . This happens in the {:propose, val} -> block

:propose | :p0 | state: %{a_bal: -1, a_val: nil, bal: -1, client: #PID<0.106.0>, name: :p0, processes: [:p0, :p1, :p2], v: 0}

This is what i ran:

{:propose, val} ->
                b = 1
                data_msg = {:msg, b, val, self()}
                IO.puts(":propose | #{inspect state.name} | data_msg: #{inspect data_msg}")
                IO.puts(":propose | #{inspect state.name} | state: #{inspect state}")
                beb_broadcast({:prepare, data_msg}, state.processes)

This file fails compilation. As this is your first post to the elixir, you need to know that the first thing in order to help the community is to provide code that compiles. This makes community much easier to replicate your issue.

Sorry about that, This work is for academia and I didn’t want to post the full code incase this post gets picked up by plagurism checkers.

I’ve edited the snippet so that it compiles successfully.

# Uniform Consensus Properties:
# Termination: Every correct porcess eventually decides some value
# Validty: if a process decides v, then v was proposes by some process
# Integrity: No process decides twice
# Agreement: No two processes decide differently

defmodule Algorithm do
    def start(name, participants, client) do
        pid = spawn(Algorithm, :init, [name, participants, client])
        # :global.unregister_name(name)
        case :global.re_register_name(name, pid) do
            :yes -> pid  
            :no  -> :error
        end
        IO.puts "registered #{name}"
        pid
    end

     def init(name, processes, client) do 
        Process.sleep(10)
        state = %{ 
            name: name, 
            processes: processes,
            client: (if is_pid(client), do: client, else: self()),

            bal: -1, 
            v: 0,

            a_bal: nil,
            a_val: nil, 

        }
        run(state)
    end

    def run(state) do
        state = receive do
            {:propose, val} ->
                b = 1
                data_msg = {:msg, b, val, self()}
                IO.puts(":propose | #{inspect state.name} | data_msg: #{inspect data_msg}")
                IO.puts(":propose | #{inspect state.name} | state: #{inspect state}")
                beb_broadcast({:prepare, data_msg}, state.processes)

            {:prepare, data_msg} ->
                {:msg, b, v, p} = data_msg
                # update state with b
                #{state | bal: b}
                IO.puts(":prepare | #{inspect state.name} | data_msg: #{inspect data_msg}")
                send(p, {:prepared, b, state.a_bal, state.a_val, data_msg})

            {:prepared, b, a_bal, a_val, data_msg} ->
                IO.puts(":prepared | #{inspect state.name} | b: #{inspect b}")
        end
        run(state)
    end
    
    defp unicast(m, p) do
        case :global.whereis_name(p) do
                pid when is_pid(pid) -> send(pid, m)
                :undefined -> :ok
        end
    end

    defp beb_broadcast(m, dest), do: for p <- dest, do: unicast(m, p)
    
end

This are the lines I ran in IEX to reproudce the error

c "algorithm.ex"
procs=[:p0, :p1, :p2]
pids = Enum.map(procs, fn(p) -> Algorithm.start(p, procs, self()) end)
[ x, y, z ] = pids
send(x, {:propose, 32})
2 Likes

The clauses in your receive don’t return values that are shaped like state, so after the first call to run things like state.name will fail with an error message like the one you’re seeing.

  • the {:propose, val} -> branch returns a list of {:prepare, data_message} tuples.
  • the {:prepare, data_msg} -> branch returns a single {:prepared, _, _, _, _} tuple
  • the {:prepared, etc etc etc} -> branch returns :ok
3 Likes

Can you expand on this?
I tried returning state after beb_broadcast but that didn’t resolve the issue.

It doesn’t resolve the issue, but it gives a different error message for me:

18:05:05.079 [error] Process #PID<0.99.0> raised an exception
** (ArgumentError) errors were found at the given arguments:

  * 1st argument: not an atom

    :erlang.apply({:prepared, 1, nil, nil, {:msg, 1, 32, #PID<0.99.0>}}, :name, [])
    main.exs:48: Algorithm.run/1

This is now failing in the {:prepared, ...} -> branch because the {:prepare, data_msg} -> branch changes state into the wrong shape.

Are you refering to this bit of code??
#{state | bal: b}

No, I’m referring to the other branches of the receive:

state =
  receive do
    {:propose, val} ->
      b = 1
      data_msg = {:msg, b, val, self()}
      IO.puts(":propose | #{inspect state.name} | data_msg: #{inspect data_msg}")
      IO.puts(":propose | #{inspect state.name} | state: #{inspect state}")
      beb_broadcast({:prepare, data_msg}, state.processes)
      state # <======= needed here, leaves state unchanged

    {:prepare, data_msg} ->
      {:msg, b, v, p} = data_msg
      IO.puts(":prepare | #{inspect state.name} | data_msg: #{inspect data_msg}")
      send(p, {:prepared, b, state.a_bal, state.a_val, data_msg})
      %{state | bal: b} # <=========== needed here, updates state

    {:prepared, b, a_bal, a_val, data_msg} ->
      IO.puts(":prepared | #{inspect state.name} | b: #{inspect b}")
      state # <========== needed here, leaves state unchanged
  end
1 Like

@klydem , @al2o3cr provided correnct solution.
Here is more explanation why.
run function is called from init where you passed the initialized state. And what is called in init, goes into loop, waiting for messages (receive part in run).
Your state map is process state, and when you return it in receive block, it is remembered in process.
When your process is OTP compliant, you get value of state from iex using

:sys.get_state(pid)

But your implementation is not OTP compliant.

My suggestion is that you try to refactor your Algorithm module with GenServer behavior:
https://hexdocs.pm/elixir/GenServer.html

Then, :sys.get_state(pid) will return process state value.

ah thank you, resolved the issue