A lot can be learned by updating outdated code.
defmodule Chat.Server do
defp reply(pid, reply) do
send pid, {self(), reply}
end
defp cast(pid, message) do
send pid, {self(), message}
# noreply expected
end
# Send 'message' to all clients except 'sender'
defp broadcast(room, message, sender \\ :undefined) do
targets =
case is_pid sender do
false ->
room
_ ->
List.delete(room, sender)
end
Enum.each targets, fn(pid) -> cast(pid, {:message, message}) end
end
def loop(room) do
receive do
{pid, :join} ->
broadcast room, "Some user with pid #{inspect pid} joined"
reply(pid, :ok)
loop([pid|room])
{pid, {:say, message}} ->
broadcast room, "#{inspect pid}" <> message, pid
reply(pid, :ok)
loop(room)
{pid, :leave} ->
reply(pid, :ok)
new_room = List.delete(room, pid)
broadcast new_room, "User with pid #{inspect pid} left"
loop(new_room)
{_pid, :stop} ->
IO.puts "#{inspect self()} Server terminating"
:ok
end
end
end
defmodule Chat.Interface do
# Send 'message' to 'server' and wait for a reply
# Notice how this function is declared using 'defp' meaning
# it's private and can only be called inside this module
defp call(server, message) do
send server, {self(), message}
receive do
{^server, reply} ->
reply
after
1000 ->
IO.puts "Connection to room timed out"
:timeout
end
end
defp cast(server, message) do
send server, {self(), message}
# noreply expected
end
# Receive a pending message from 'server' and print it
def flush(server) do
receive do
# The caret '^' is used to match against the value of 'server',
# it is a basic filtering based on the sender
{ ^server, {:message, message} } ->
IO.puts message
flush(server)
# no more messages to flush
after
0 ->
:ok
end
end
# In all of the following functions 'server' stands for the server's pid
def join(server) do
call(server, :join)
end
def say(server, message) do
call(server, {:say, message} )
end
def leave(server) do
call(server, :leave)
end
# Part of the interface - though not the "Chat" interface
def stop(server) do
cast(server, :stop)
end
end
server_pid = spawn fn() -> Chat.Server.loop [] end
# register shell as a client
Chat.Interface.join server_pid
# Spawn another process as client
spawn fn() ->
Chat.Interface.join server_pid
Chat.Interface.say server_pid, "Hi!"
Chat.Interface.leave server_pid
end
# And another one
spawn fn() ->
Chat.Interface.join server_pid
Chat.Interface.say server_pid, "What's up?"
Chat.Interface.leave server_pid
end
:timer.sleep 500
# messages accumulated for the shell
Chat.Interface.flush server_pid
Chat.Interface.leave server_pid
Chat.Interface.stop server_pid
:timer.sleep 500
IO.puts"Done"
works even on http://elixirplayground.com/
Some user with pid #PID<0.58.0> joined
Some user with pid #PID<0.59.0> joined
#PID<0.58.0>Hi!
User with pid #PID<0.58.0> left
#PID<0.59.0>What's up?
User with pid #PID<0.59.0> left
#PID<0.57.0> Server terminating
Done
GenServer version
defmodule Chat.Server do
use GenServer
# Send 'message' to all clients except 'sender'
defp broadcast(room, message, sender \\ :undefined) do
targets =
case is_pid sender do
false ->
room
_ ->
List.delete(room, sender)
end
Enum.each targets, fn(pid) -> GenServer.cast(pid, {:message, message}) end
end
## GenServer callbacks
def terminate(_reason, _room) do
IO.puts "#{inspect self()} Server terminating"
:ok
end
# Note:
# Always use "from" as an opaque data type;
# don’t assume it is a tuple, as its representation might change in future releases.
# p.89 Designing for Scalability with Erlang/OTP (2016)
#
def handle_call({:join, pid}, _from, room) do
broadcast room, "Some user with pid #{inspect pid} joined"
{:reply, :ok, [pid|room]}
end
def handle_call({:say, pid, message}, _from, room) do
broadcast room, "#{inspect pid}" <> message, pid
{:reply, :ok, room}
end
def handle_call({:leave, pid}, _from, room) do
new_room = List.delete(room, pid)
broadcast new_room, "User with pid #{inspect pid} left"
{:reply, :ok, new_room}
end
end
defmodule Chat.Interface do
# In all of the following functions 'server' stands for the server's pid
def join(server) do
GenServer.call(server, {:join, self()})
end
def say(server, message) do
GenServer.call(server, {:say, self(), message})
end
def leave(server) do
GenServer.call(server, {:leave, self()})
end
# Part of the interface - though not the "Chat" interface
def stop(server) do
GenServer.stop(server)
end
# Receive a pending message from 'server' and print it
def flush(server) do
receive do
{:"$gen_cast", {:message, message }} ->
IO.inspect message
flush(server)
# no more messages to flush
after
0 ->
:ok
end
end
end
{:ok, server_pid} = GenServer.start Chat.Server, []
# register shell as a client
Chat.Interface.join server_pid
# Spawn another process as client
spawn fn() ->
Chat.Interface.join server_pid
Chat.Interface.say server_pid, "Hi!"
Chat.Interface.leave server_pid
end
# And another one
spawn fn() ->
Chat.Interface.join server_pid
Chat.Interface.say server_pid, "What's up?"
Chat.Interface.leave server_pid
end
:timer.sleep 500
# messages accumulated for the shell
Chat.Interface.flush server_pid
Chat.Interface.leave server_pid
Chat.Interface.stop server_pid
:timer.sleep 500
IO.puts"Done"
Elixir playground doesn’t support GenServer.stop
.
And finally the Agent version … it’s wrong in so many ways.
defmodule Chat.App do
## Agent.update runs the following functions in the Agent's process
## via the anonymous functions returned by the "handle_x" functions
# Send 'message' to all clients except 'sender'
defp broadcast(room, message, sender \\ :undefined) do
targets =
case is_pid sender do
false ->
room
_ ->
List.delete(room, sender)
end
Enum.each targets,
fn(pid) -> send pid, {self(), {:message, message}} end
end
defp handle_join(pid) do
fn(room) ->
broadcast room, "Some user with pid #{inspect pid} joined"
[pid|room]
end
end
defp handle_say(pid, message) do
fn(room) ->
broadcast room, "#{inspect pid}" <> message, pid
room
end
end
defp handle_leave(pid) do
fn(room) ->
new_room = List.delete(room, pid)
broadcast new_room, "User with pid #{inspect pid} left"
new_room
end
end
## the following functions run in the client's process
def join(agent) do
Agent.update(agent, handle_join(self()))
end
def say(agent, message) do
Agent.update(agent, handle_say(self(),message))
end
def leave(agent) do
Agent.update(agent, handle_leave(self()))
end
# Receive pending messages from 'agent' and print them
def flush(agent) do
receive do
# The caret '^' is used to match against the value of 'server',
# it is a basic filtering based on the sender
{ ^agent, {:message, message} } ->
IO.puts message
flush(agent)
# no more messages to flush
after
0 ->
:ok
end
end
end
{:ok, agent_pid} = Agent.start fn -> [] end
# register shell as a client
Chat.App.join agent_pid
# Spawn another process as client
spawn fn() ->
Chat.App.join agent_pid
Chat.App.say agent_pid, "Hi!"
Chat.App.leave agent_pid
end
# And another one
spawn fn() ->
Chat.App.join agent_pid
Chat.App.say agent_pid, "What's up?"
Chat.App.leave agent_pid
end
:timer.sleep 500
# messages accumulated for the shell
Chat.App.flush agent_pid
Chat.App.leave agent_pid
Agent.stop agent_pid
:timer.sleep 500
IO.puts"Done"
The following is more in keeping with how an Agent is actually supposed to be used - as a state container. As a result the chat is now peer-to-peer - there is no “server” as such. However the clients do not keep the “room” as part of their own state - that is farmed out to the agent and manipulated through the anonymous functions created by the various handle_x
functions.
However it still remains a strange way of using an agent.
defmodule Chat.App do
#"handle_x" return anonymous functions
# that are sent to the agent to run in the agent process
defp handle_join(pid) do
fn(room) ->
{room, [pid|room]}
end
end
defp handle_say() do
fn(room) -> room end
end
defp handle_leave(pid) do
fn(room) ->
new_room = List.delete(room, pid)
{new_room, new_room}
end
end
## the following functions run in the client's process
## now only the clients are sending messages
def join(agent) do
pid = self()
old_room = Agent.get_and_update(agent, handle_join(pid))
broadcast old_room, "Some user with pid #{inspect pid} joined"
end
def say(agent, message) do
pid = self()
room = Agent.get(agent, handle_say())
broadcast room, "#{inspect pid}" <> message, pid
end
def leave(agent) do
pid = self()
new_room = Agent.get_and_update(agent, handle_leave(pid))
broadcast new_room, "User with pid #{inspect pid} left"
end
# Send 'message' to all clients except 'sender'
defp broadcast(room, message, sender \\ :undefined) do
targets =
case is_pid sender do
false ->
room
_ ->
List.delete(room, sender)
end
Enum.each targets,
fn(pid) -> send pid, {self(), {:message, message}} end
end
# Receive pending messages from 'agent' and print them
def flush() do
receive do
# The caret '^' is used to match against the value of 'server',
# it is a basic filtering based on the sender
{ _pid, {:message, message} } ->
IO.puts message
flush()
# no more messages to flush
after
0 ->
:ok
end
end
end
{:ok, agent_pid} = Agent.start fn -> [] end
# register shell as a client
Chat.App.join agent_pid
# Spawn another process as client
spawn fn() ->
Chat.App.join agent_pid
Chat.App.say agent_pid, "Hi!"
Chat.App.leave agent_pid
end
# And another one
spawn fn() ->
Chat.App.join agent_pid
Chat.App.say agent_pid, "What's up?"
Chat.App.leave agent_pid
end
:timer.sleep 500
# messages accumulated for the shell
Chat.App.flush
Chat.App.leave agent_pid
Agent.stop agent_pid
:timer.sleep 500
IO.puts"Done"