Problem with GenStage stages re-subscribing after crash when stages are unnamed

My company is planning on using GenStage for pipeline processing. In particular, we want to read events from a table, make HTTP calls to an external service for each event, and then load the data returned into a table. We want to set this up as a three stage process, one producer, multiple producer_consumers and multiple consumers. I’ve been testing the resiliency of GenStage stages re-syncing after crashes using this prototype. It works great!

However, in this prototype, each stage process is named with an atom, such as HTTPRequestor1. They are dynamically created. This is less than ideal imo and I wanted to use Registry instead to keep track of the pids of each stage based on a key, like {HTTPRequestor, 1}.

I’ve set his up in a separate branch. Unfortunately, when I randomly crash a stage, they no longer reconnect despite acquiring the valid pids of the processes they subscribe to on start-up.

I’ve been struggling to understand why the stages are not re-syncing when using pids instead of atoms.

Here’s an example of how I’ve set up the final consumer stage to register it’s pid on startup and subscribe to the producer_consumer stage on init/1.

  def start_link({id, subs}) do
    {:ok, pid} = GenStage.start_link(__MODULE__, {id, subs})
    {:ok, _} = Registry.register(Registry.Pipeline, {DBLoader, id}, pid)
    {:ok, pid}
  end

  def init({id, subs}) do
    IO.puts(green() <> "{DBLoader, #{id}} subscribed!")
    producers =
      for sub <- 1..subs do
        [{_, pid}] = Registry.lookup(Registry.Pipeline, {HTTPRequestor, sub})
        {pid, max_demand: 3}
      end
    {:consumer, "{DBLoader, #{id}}", subscribe_to: producers}
  end

Contrasted with naming the process explicitly.

  def start_link({id, subs}) do
    name = :"#{__MODULE__}#{id}"
    GenStage.start_link(__MODULE__, {subs, name}, name: name)
  end

  def init({subs, name}) do
    IO.puts(green() <> "#{name} subscribed!")
    producers =
      for id <- 1..subs do
        {:"Elixir.Pipeline.HTTPRequestor#{id}", max_demand: 3}
      end
    {:consumer, name, subscribe_to: producers}
  end

@wfgilman: The problem is that you subscribe to the “raw” pid instead of the Registry entry. You can read more about how to use the so-called “:via tuples” in the Registry documentation.

Try this:

producers = for sub <- 1..subs,
  do: {:via, Registry, {Registry.Pipeline, {HTTPRequestor, sub}}}

That was the problem. I replaced my subscriptions with the following code and it worked. Thank you @wmnnd!

  def start_link({id, subs}) do
    name = {:via, Registry, {Pipeline.Registry, {DBLoader, id}}}
    GenStage.start_link(__MODULE__, {id, subs}, name: name)
  end

  def init({id, subs}) do
    IO.puts(green() <> "{DBLoader, #{id}} subscribed!")
    producers =
      for sub <- 1..subs do
        name = {:via, Registry, {Pipeline.Registry, {HTTPRequestor, sub}}}
        {name, max_demand: 3}
      end
    {:consumer, "{DBLoader, #{id}}", subscribe_to: producers}
  end

Relevant naming registration documentation from GenServer:

Name Registration
Both start_link/3 and start/3 support the GenServer to register a name on start via the :name option. Registered names are also automatically cleaned up on termination. The supported values are:

an atom - the GenServer is registered locally with the given name using Process.register/2.
{:global, term}- the GenServer is registered globally with the given term using the functions in the :global module.
{:via, module, term} - the GenServer is registered with the given mechanism and name. The :via option expects a module that exports register_name/2, unregister_name/1, whereis_name/1 and send/2. One such example is the :global module which uses these functions for keeping the list of names of processes and their associated PIDs that are available globally for a network of Elixir nodes. Elixir also ships with a local, decentralized and scalable registry called Registry for locally storing names that are generated dynamically.