What is the best way to run only one GenStage on the distributed system?

I want to make a GenStage on the distributed system that loads sources from DB.
If there are multiple GenStage that load sources from DB, they can load duplicated sources, and it can make not intended results.
If I add GenStage to children of an application and that application is launched on multiple machines, there can be multiple GenStage.
What is the best way to run only one GenStage on the distributed system?

Do you want the GenStage to be running only on one machine or to ensure that only one GenStage is producing data at the same time?

Sounds like you need some kind of lock. The answer to the question above might dictate how long you need to hold the lock.

There’s this library, GitHub - arjan/singleton: Global, supervised singleton processes for Elixir

I’ve just finished implementing something similar, although they are singleton supervision trees. I ended up using the :global name registration along with capturing exits of the children to manage cleanup. The unfortunate part of using a supervisor as the singleton parent is that there is no way to handle the conflict resolution for name collisions when multiple nodes attempt to register the same process under the same name. So it’s :brutal_kill'd.

I thought about moving the parent to a genserver, but something feels wrong about having a genserver manage a supervision tree. I may change my mind though. The basic code for my singleton supervisor looks like this.

defmodule Singleton do
  defmodule Supervisor do
    def start_link(mod, init, opts \\ []) do
      Singleton.do_start_link(Elixir.Supervisor, mod, init, opts)
    end
  end
 
  def do_start_link(otp, mod, init, opts) do
    Logger.info("Starting singleton: #{otp}.#{mod} #{inspect(opts[:name])}")
    case otp.start_link(mod, init, opts) do
      {:ok, pid} -> 
        Logger.info("Started singleton: #{otp}.#{mod} #{inspect(opts[:name])} - #{inspect(pid)}")
        {:ok, pid}

      {:error, {:already_started, pid}} ->
        Logger.info("Already started: #{otp}.#{mod} #{inspect(opts[:name])} - #{inspect(pid)}")
        Process.link(pid)
        {:ok, pid}

      error ->
        Logger.error("Error: #{inspect(error)}")
        :ignore
    end
  end
end

The nice thing about this is as soon as one of the singletons fails or is stopped on one node, it’s immediately started by another node.

To use it, something like this.

defmodule MyModule
  use Supervisor
  def start_link(session: session) do
    Singleton.Supervisor.start_link(__MODULE__, session, name: {:global, __MODULE__})
  end

  def init(session) do
    Supervisor.init(children, opts)
  end
end

The {:global, name} part is important.

In addition, I wrap all the singletons in an additional Supervisor and set the max_restarts to double the length of all singleton processes.

1 Like

I’ve found this article.

1 Like

Have you looked at Flow? Flow has a feature that allows you to partition the data such that each partition can be sent to one process dedicated for that subset of data. Even if you don’t use Flow itself, you might be able to get some ideas from their implementation.

You might also get some ideas from this book: Concurrent Data Processing in Elixir: Fast, Resilient Applications with OTP, GenStage, Flow, and Broadway by Svilen Gospodinov