My company is planning on using GenStage for pipeline processing. In particular, we want to read events from a table, make HTTP calls to an external service for each event, and then load the data returned into a table. We want to set this up as a three stage process, one producer
, multiple producer_consumer
s and multiple consumers
. I’ve been testing the resiliency of GenStage stages re-syncing after crashes using this prototype. It works great!
However, in this prototype, each stage process is named with an atom, such as HTTPRequestor1
. They are dynamically created. This is less than ideal imo and I wanted to use Registry
instead to keep track of the pids of each stage based on a key, like {HTTPRequestor, 1}
.
I’ve set his up in a separate branch. Unfortunately, when I randomly crash a stage, they no longer reconnect despite acquiring the valid pids of the processes they subscribe to on start-up.
I’ve been struggling to understand why the stages are not re-syncing when using pids instead of atoms.
Here’s an example of how I’ve set up the final consumer
stage to register it’s pid on startup and subscribe to the producer_consumer
stage on init/1
.
def start_link({id, subs}) do
{:ok, pid} = GenStage.start_link(__MODULE__, {id, subs})
{:ok, _} = Registry.register(Registry.Pipeline, {DBLoader, id}, pid)
{:ok, pid}
end
def init({id, subs}) do
IO.puts(green() <> "{DBLoader, #{id}} subscribed!")
producers =
for sub <- 1..subs do
[{_, pid}] = Registry.lookup(Registry.Pipeline, {HTTPRequestor, sub})
{pid, max_demand: 3}
end
{:consumer, "{DBLoader, #{id}}", subscribe_to: producers}
end
Contrasted with naming the process explicitly.
def start_link({id, subs}) do
name = :"#{__MODULE__}#{id}"
GenStage.start_link(__MODULE__, {subs, name}, name: name)
end
def init({subs, name}) do
IO.puts(green() <> "#{name} subscribed!")
producers =
for id <- 1..subs do
{:"Elixir.Pipeline.HTTPRequestor#{id}", max_demand: 3}
end
{:consumer, name, subscribe_to: producers}
end