Genstage ConsumerSupervisor Restart Children

From what I am reading here: https://hexdocs.pm/gen_stage/ConsumerSupervisor.html

all the implementations of the ConsumerSupervisor only start a child (the printer module in the link above) for each unit of work. Is there a way to restart the child if it dies?

To me, with the name ‘ConsumerSupervisor’ it would have the ability to restart children if the child doesn’t normally shutdown. Has anyone done this before?

In My implemntation, I have the consumersupervisor starting a child that is actually a GenServer to perform work, then shut its self down…If it crashes abnormally, I want it to be restarted.

Thoughts… I thought about just implementing a consumer that calls out to a dynamicsupervisor then starts children but that doesn’t account for back preasure…

Here is how i have it implemented, but want the children to be restarted if they crash:

Producer:

defmodule Client.Strategy.EventPushing.SendingProducer do
  use GenStage

  def start_link() do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  ## Callbacks

  def init(:ok) do
    {:producer, []}
  end

  # public endpoint for events adding
  def add(events), do: GenServer.cast(__MODULE__, {:add, events})

  # just push events to consumers on adding
  def handle_cast({:add, events}, state) when is_list(events) do
    {:noreply, events, state}
  end

  def handle_cast({:add, events}, state), do: {:noreply, [events], state}

  # ignore any demand
  def handle_demand(_, state), do: {:noreply, [], state}
end

ConsumerSupervisor:

defmodule Client.Strategy.EventPushing.SendingConsumer do
  use ConsumerSupervisor

  def start_link() do
    ConsumerSupervisor.start_link(__MODULE__, :ok)
  end

  # Callbacks

  def init(:ok) do
    children = [
      worker(Client.DynamicClient, [], restart: :temporary)
    ]

    {:ok, children, strategy: :one_for_one, subscribe_to: [{Client.Strategy.EventPushing.SendingProducerConsumer, max_demand: 10}]}
  end
end

The Client (Children)

defmodule Client.DynamicClient do
  use GenServer
  require Logger

  # Public API's
  def start_link(event) do
    Client.Statix.timing("sending", 1)
    GenServer.start_link(
      __MODULE__,
      event,
      name: String.to_atom(event.sequence_number)
    )
  end

  def schedule_work() do
    send(self(), :start_work)
  end

  # Call Backs
  def init(event) do
    schedule_work()
    {:ok, event}
  end

  def handle_info(
        :start_work,
        %{
          :filter => filter,
          :sequence_number => sequence_number,
          :message => message,
          :ip_address => ip,
          :port => port
        } = state
      ) do

    opts = [:binary, active: true]

    case :gen_tcp.connect(ip, port, opts) do
      {:ok, socket} ->
        :gen_tcp.send(socket, filter)
        :gen_tcp.send(socket, message)

      {:error, reason} ->
        Logger.error("Error")
        Process.exit(self(), :kill)
    end

    {:noreply,state}
  end

  def handle_info({:tcp, socket, msg}, state) do
    # :inet.setopts(socket, active: :once)

    :gen_tcp.close(socket)
     Process.exit(self(), :kill)
    {:noreply, state}
  end

  def handle_info({:tcp_closed, _socket,}, state) do
    Client.SendingSupervisor.stop_child(self())
  end
end

Isn’t the restart strategy the problem here?
worker(Client.DynamicClient, [], restart: :temporary)

Haven’t tried this myself, but seems from the code that the ConsumerSupervisor can restart its children:

Thank you! I changed the restart strategy to :transient and it works great now.

1 Like