Genstage: Dynamic Producers

Genstage: Dynamic Producers

When one or more supervised gen_stage producers (:producer) crash and get restarted by supervisor while they’re subscribed to consumer (:consumer), they aren’t added back to consumers subscription list. What would be the best way to handle “automatic” re-subscriptions?

Example code

Producer:

defmodule Producer do
  use GenStage

  def start_link(identifier) do
    GenStage.start_link(__MODULE__, :ok,
      name: {:via, Registry, {Registry.Producers, {__MODULE__, identifier}}}
    )
  end

  def kill(identifier) do
    GenServer.stop({:via, Registry, {Registry.Producers, {__MODULE__, identifier}}})
  end

  def init(:ok) do
    {:producer, :ok}
  end

  def handle_demand(demand, :ok) do
    {:noreply, [], :ok}
  end
end

Consumer:

defmodule Consumer do
  use GenStage

  def start_link(_arg) do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    {:consumer, :ok}
  end

  def subscribe(producer_identifier) do
    GenStage.sync_subscribe(__MODULE__,
      to: {:via, Registry, {Registry.Producers, {Producer, producer_identifier}}},
      cancel: :transient
    )
  end

  def handle_events(events, _from, :ok) do
    {:noreply, [], state}
  end
end

ProducerSupervisor:

defmodule ProducerSupervisor do
  use DynamicSupervisor

  def start_link(_init_arg) do
    DynamicSupervisor.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def start_child(identifier) do
    spec = %{id: Producer, start: {Producer, :start_link, [identifier]}}
    DynamicSupervisor.start_child(__MODULE__, spec)
  end

  def list_children() do
    DynamicSupervisor.which_children(__MODULE__)
  end

  def init(:ok) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end
end

Executing code:

Registry.start_link(keys: :unique, name: Registry.Producers) # Start registry

ProducerSupervisor.start_link(:ok) # Start ProducerSupervisor
# Start 3 supervised producers
ProducerSupervisor.start_child("a")
ProducerSupervisor.start_child("b")
ProducerSupervisor.start_child("c")

Consumer.start_link(:ok) # Start consumer
# Subscribe to producers
Consumer.subscribe("a")
Consumer.subscribe("b")
Consumer.subscribe("c")
Listing started producers from supervisor
iex(3)> ProducerSupervisor.list_children()
[
  {:undefined, #PID<0.182.0>, :worker, [Producer]},
  {:undefined, #PID<0.183.0>, :worker, [Producer]},
  {:undefined, #PID<0.184.0>, :worker, [Producer]}
]
Listing consumer producers (subscriptions)
iex(5)> :sys.get_state(Consumer).producers
%{
  #Reference<0.234815945.2133590018.84622> => {#PID<0.182.0>, :transient,
   {1000, 500, 1000}},
  #Reference<0.234815945.2133590018.84625> => {#PID<0.183.0>, :transient,
   {1000, 500, 1000}},
  #Reference<0.234815945.2133590018.84628> => {#PID<0.184.0>, :transient,
   {1000, 500, 1000}}
}

The problem

Killing (restarting) producer
iex(6)> Producer.kill("a") # Kill producer with identifier a
:ok
Listing producers from supervisor (3 producers)
iex(7)> ProducerSupervisor.list_children()
[
  {:undefined, #PID<0.183.0>, :worker, [Producer]},
  {:undefined, #PID<0.184.0>, :worker, [Producer]},
  {:undefined, #PID<0.196.0>, :worker, [Producer]} # Restarted producer (a)
]
Listing subscribed consumer producers (only 2 producers now)
iex(8)> :sys.get_state(Consumer).producers
%{
  #Reference<0.234815945.2133590018.84625> => {#PID<0.183.0>, :transient,
   {1000, 500, 1000}},
  #Reference<0.234815945.2133590018.84628> => {#PID<0.184.0>, :transient,
   {1000, 500, 1000}}
}

I’ve thought about handling “subscriptions” in Producer.init function, since crashed and new producers would start and get automatically subscribed to consumer. Is it the only sane way to handle automatic re-subscription, or are there betters ways to automate it?

Example code:
defmodule Producer do
  use GenStage

  def start_link(identifier) do
    GenStage.start_link(__MODULE__, identifier,
      name: {:via, Registry, {Registry.Producers, {__MODULE__, identifier}}}
    )
  end

  def kill(identifier) do
    GenServer.stop({:via, Registry, {Registry.Producers, {__MODULE__, identifier}}})
  end

  def init(identifier) do
    Consumer.subscribe(identifier) # Subscribe here
    {:producer, :ok}
  end

  def handle_demand(demand, :ok) do
    {:noreply, [], :ok}
  end
end

GitHub gist: https://gist.github.com/nkyian/64049dd19b844864d42e965328308727
Elixir version : 1.7.4
OTP version: 21
Mix deps: [{:gen_stage, "~> 0.14"}]

4 Likes