From what I am reading here: https://hexdocs.pm/gen_stage/ConsumerSupervisor.html
all the implementations of the ConsumerSupervisor only start a child (the printer module in the link above) for each unit of work. Is there a way to restart the child if it dies?
To me, with the name ‘ConsumerSupervisor’ it would have the ability to restart children if the child doesn’t normally shutdown. Has anyone done this before?
In My implemntation, I have the consumersupervisor starting a child that is actually a GenServer to perform work, then shut its self down…If it crashes abnormally, I want it to be restarted.
Thoughts… I thought about just implementing a consumer that calls out to a dynamicsupervisor then starts children but that doesn’t account for back preasure…
Here is how i have it implemented, but want the children to be restarted if they crash:
Producer:
defmodule Client.Strategy.EventPushing.SendingProducer do
use GenStage
def start_link() do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
## Callbacks
def init(:ok) do
{:producer, []}
end
# public endpoint for events adding
def add(events), do: GenServer.cast(__MODULE__, {:add, events})
# just push events to consumers on adding
def handle_cast({:add, events}, state) when is_list(events) do
{:noreply, events, state}
end
def handle_cast({:add, events}, state), do: {:noreply, [events], state}
# ignore any demand
def handle_demand(_, state), do: {:noreply, [], state}
end
ConsumerSupervisor:
defmodule Client.Strategy.EventPushing.SendingConsumer do
use ConsumerSupervisor
def start_link() do
ConsumerSupervisor.start_link(__MODULE__, :ok)
end
# Callbacks
def init(:ok) do
children = [
worker(Client.DynamicClient, [], restart: :temporary)
]
{:ok, children, strategy: :one_for_one, subscribe_to: [{Client.Strategy.EventPushing.SendingProducerConsumer, max_demand: 10}]}
end
end
The Client (Children)
defmodule Client.DynamicClient do
use GenServer
require Logger
# Public API's
def start_link(event) do
Client.Statix.timing("sending", 1)
GenServer.start_link(
__MODULE__,
event,
name: String.to_atom(event.sequence_number)
)
end
def schedule_work() do
send(self(), :start_work)
end
# Call Backs
def init(event) do
schedule_work()
{:ok, event}
end
def handle_info(
:start_work,
%{
:filter => filter,
:sequence_number => sequence_number,
:message => message,
:ip_address => ip,
:port => port
} = state
) do
opts = [:binary, active: true]
case :gen_tcp.connect(ip, port, opts) do
{:ok, socket} ->
:gen_tcp.send(socket, filter)
:gen_tcp.send(socket, message)
{:error, reason} ->
Logger.error("Error")
Process.exit(self(), :kill)
end
{:noreply,state}
end
def handle_info({:tcp, socket, msg}, state) do
# :inet.setopts(socket, active: :once)
:gen_tcp.close(socket)
Process.exit(self(), :kill)
{:noreply, state}
end
def handle_info({:tcp_closed, _socket,}, state) do
Client.SendingSupervisor.stop_child(self())
end
end