Genstage: Dynamic Producers
When one or more supervised gen_stage
producers (:producer
) crash and get restarted by supervisor while they’re subscribed to consumer (:consumer
), they aren’t added back to consumers subscription list. What would be the best way to handle “automatic” re-subscriptions?
Example code
Producer:
defmodule Producer do
use GenStage
def start_link(identifier) do
GenStage.start_link(__MODULE__, :ok,
name: {:via, Registry, {Registry.Producers, {__MODULE__, identifier}}}
)
end
def kill(identifier) do
GenServer.stop({:via, Registry, {Registry.Producers, {__MODULE__, identifier}}})
end
def init(:ok) do
{:producer, :ok}
end
def handle_demand(demand, :ok) do
{:noreply, [], :ok}
end
end
Consumer:
defmodule Consumer do
use GenStage
def start_link(_arg) do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
{:consumer, :ok}
end
def subscribe(producer_identifier) do
GenStage.sync_subscribe(__MODULE__,
to: {:via, Registry, {Registry.Producers, {Producer, producer_identifier}}},
cancel: :transient
)
end
def handle_events(events, _from, :ok) do
{:noreply, [], state}
end
end
ProducerSupervisor:
defmodule ProducerSupervisor do
use DynamicSupervisor
def start_link(_init_arg) do
DynamicSupervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end
def start_child(identifier) do
spec = %{id: Producer, start: {Producer, :start_link, [identifier]}}
DynamicSupervisor.start_child(__MODULE__, spec)
end
def list_children() do
DynamicSupervisor.which_children(__MODULE__)
end
def init(:ok) do
DynamicSupervisor.init(strategy: :one_for_one)
end
end
Executing code:
Registry.start_link(keys: :unique, name: Registry.Producers) # Start registry
ProducerSupervisor.start_link(:ok) # Start ProducerSupervisor
# Start 3 supervised producers
ProducerSupervisor.start_child("a")
ProducerSupervisor.start_child("b")
ProducerSupervisor.start_child("c")
Consumer.start_link(:ok) # Start consumer
# Subscribe to producers
Consumer.subscribe("a")
Consumer.subscribe("b")
Consumer.subscribe("c")
Listing started producers from supervisor
iex(3)> ProducerSupervisor.list_children()
[
{:undefined, #PID<0.182.0>, :worker, [Producer]},
{:undefined, #PID<0.183.0>, :worker, [Producer]},
{:undefined, #PID<0.184.0>, :worker, [Producer]}
]
Listing consumer producers (subscriptions)
iex(5)> :sys.get_state(Consumer).producers
%{
#Reference<0.234815945.2133590018.84622> => {#PID<0.182.0>, :transient,
{1000, 500, 1000}},
#Reference<0.234815945.2133590018.84625> => {#PID<0.183.0>, :transient,
{1000, 500, 1000}},
#Reference<0.234815945.2133590018.84628> => {#PID<0.184.0>, :transient,
{1000, 500, 1000}}
}
The problem
Killing (restarting) producer
iex(6)> Producer.kill("a") # Kill producer with identifier a
:ok
Listing producers from supervisor (3 producers)
iex(7)> ProducerSupervisor.list_children()
[
{:undefined, #PID<0.183.0>, :worker, [Producer]},
{:undefined, #PID<0.184.0>, :worker, [Producer]},
{:undefined, #PID<0.196.0>, :worker, [Producer]} # Restarted producer (a)
]
Listing subscribed consumer producers (only 2 producers now)
iex(8)> :sys.get_state(Consumer).producers
%{
#Reference<0.234815945.2133590018.84625> => {#PID<0.183.0>, :transient,
{1000, 500, 1000}},
#Reference<0.234815945.2133590018.84628> => {#PID<0.184.0>, :transient,
{1000, 500, 1000}}
}
I’ve thought about handling “subscriptions” in Producer.init
function, since crashed and new producers would start and get automatically subscribed to consumer. Is it the only sane way to handle automatic re-subscription, or are there betters ways to automate it?
Example code:
defmodule Producer do
use GenStage
def start_link(identifier) do
GenStage.start_link(__MODULE__, identifier,
name: {:via, Registry, {Registry.Producers, {__MODULE__, identifier}}}
)
end
def kill(identifier) do
GenServer.stop({:via, Registry, {Registry.Producers, {__MODULE__, identifier}}})
end
def init(identifier) do
Consumer.subscribe(identifier) # Subscribe here
{:producer, :ok}
end
def handle_demand(demand, :ok) do
{:noreply, [], :ok}
end
end
GitHub gist: https://gist.github.com/nkyian/64049dd19b844864d42e965328308727
Elixir version : 1.7.4
OTP version: 21
Mix deps: [{:gen_stage, "~> 0.14"}]