Example: stack server with an asynchronous “pop” and a proxy server which turns it into a synchronous “pop”.
# file cast/lib/cast/application.ex
#
defmodule Cast.Application do
use Application
def start(_type, _args) do
stack_server = :cast_stack
proxy_server = :proxy
children = [
{Stack, [args: [:hello], name: stack_server]},
{Proxy, [for_server: stack_server, name: proxy_server]}
]
opts = [strategy: :rest_for_one, name: Cast.Supervisor]
Supervisor.start_link(children, opts)
end
end
# file: cast/lib/stack.ex
# stack server with asynchronous "pop"
#
defmodule Stack do
use GenServer
# client
def start_link(opts) do
{args, opts} = Keyword.pop(opts, :args, [])
{name, opts} = Keyword.pop(opts, :name, __MODULE__)
opts = Keyword.put(opts, :name, name)
GenServer.start_link(__MODULE__, args, opts)
end
def push(server, item),
do: GenServer.cast(server, {:push, item})
def pop(server) do
ref = make_ref() # create a reference as a correlation identifier
GenServer.cast(server, {:pop, ref, self()})
ref
end
# implemention
def pop_to([item | rest], ref, to) do
Process.send_after(self(), {:pop_to, to, ref, item}, 500) # send head item in half a second
rest
end
# callbacks
@impl GenServer
def init(stack),
do: {:ok, stack}
@impl GenServer
def handle_cast({:pop, ref, to}, state),
do: {:noreply, pop_to(state, ref, to)} # Part 1 of asynchronous "pop": pop the top and prepare to send it later.
def handle_cast({:push, item}, state),
do: {:noreply, [item | state]} # push new item on the stack
@impl GenServer
def handle_info({:pop_to, to, ref, item}, state) do # process send_after message
GenServer.cast(to, {:popped, ref, item}) # Part 2 of asynchronous "pop": send popped item back to client.
{:noreply, state}
end
end
# file: cast/lib/proxy.ex
# proxy server to the stack server that turns the
# stack server's asynchronous "pop" into a synchronous "pop".
#
defmodule Proxy do
use GenServer
# client
def start_link(opts) do
{for_server, opts} = Keyword.pop(opts, :for_server, [])
{name, opts} = Keyword.pop(opts, :name, __MODULE__)
opts = Keyword.put(opts, :name, name)
GenServer.start_link(__MODULE__, {for_server, %{}}, opts)
end
def push(server, item),
do: GenServer.cast(server, {:push, item})
def pop(server),
do: GenServer.call(server, :pop)
# implementation
def start_pop(from, {for_server, pending}) do
ref = Stack.pop(for_server) # cast asynchronous "pop" request
new_pending = Map.put(pending, ref, from) # store the correlation identifier
{for_server, new_pending}
end
def complete_pop(pending, ref, item) do
case Map.fetch(pending, ref) do # find client based on correlation identifier
{:ok, from} ->
GenServer.reply(from, item) # now complete that pending "pop" call
Map.delete(pending, ref)
_ ->
pending
end
end
# callbacks
@impl GenServer
def init(args),
do: {:ok, args}
@impl GenServer
def handle_call(:pop, from, state),
do: {:noreply, start_pop(from, state)} # i.e. don't complete "pop" call; wait until _item_ is cast back
@impl GenServer
def handle_cast({:push, item}, {for_server, _} = state) do
Stack.push(for_server, item) # forward item to stack "push"
{:noreply, state}
end
def handle_cast({:popped, ref, item},{for_server, pending}) do # i.e. popped item has been cast back
state = {for_server, complete_pop(pending, ref, item)}
{:noreply, state}
end
end
$ iex -S mix
Erlang/OTP 21 [erts-10.0] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe] [dtrace]
Interactive Elixir (1.6.6) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Application.started_applications()
[
{:cast, 'cast', '0.1.0'},
{:logger, 'logger', '1.6.6'},
{:mix, 'mix', '1.6.6'},
{:iex, 'iex', '1.6.6'},
{:elixir, 'elixir', '1.6.6'},
{:compiler, 'ERTS CXC 138 10', '7.2'},
{:stdlib, 'ERTS CXC 138 10', '3.5'},
{:kernel, 'ERTS CXC 138 10', '6.0'}
]
iex(2)> Application.stop(:cast)
:ok
16:46:40.552 [info] Application cast exited: :stopped
iex(3)> r(Stack)
warning: redefining module Stack (current version loaded from _build/dev/lib/cast/ebin/Elixir.Stack.beam)
lib/stack.ex:4
{:reloaded, Stack, [Stack]}
iex(4)> r(Proxy)
warning: redefining module Proxy (current version loaded from _build/dev/lib/cast/ebin/Elixir.Proxy.beam)
lib/proxy.ex:5
{:reloaded, Proxy, [Proxy]}
iex(5)> r(Cast.Application)
warning: redefining module Cast.Application (current version loaded from _build/dev/lib/cast/ebin/Elixir.Cast.Application.beam)
lib/cast/application.ex:3
{:reloaded, Cast.Application, [Cast.Application]}
iex(6)> Application.start(:cast)
:ok
iex(7)> Proxy.pop(:proxy) # pop via proxy (synchronous)
:hello
iex(8)> Proxy.push(:proxy, :greetings) # push via proxy
:ok
iex(9)> Stack.push(:cast_stack, :bye) # push directly
:ok
iex(10)> Stack.pop(:cast_stack) # pop directly
#Reference<0.983023136.1924136961.112221>
iex(11)> flush()
:ok
iex(12)> Process.sleep(500)
:ok
iex(13)> flush()
{:"$gen_cast", {:popped, #Reference<0.983023136.1924136961.112221>, :bye}}
:ok
iex(14)> Stack.pop(:cast_stack)
#Reference<0.983023136.1924136961.112255>
iex(15)> Process.sleep(600)
:ok
iex(16)> flush()
{:"$gen_cast", {:popped, #Reference<0.983023136.1924136961.112255>, :greetings}}
:ok
iex(17)>