Distributed Registry/Service Discovery/Worker pools in non-uniform clusters

I’m struggling to understand a problem I’m facing, and I’m not sure if there’s an existing solution.

In the project I’m working on, we need to create embeddings using Bumblebee in conjunction with pgvector. Currently, all nodes in the cluster are running the same release. However, with the new requirements for embeddings generation, I want some nodes to have more powerful GPUs. These nodes will detect the presence of a GPU using an environment variable and will start the Bumblebee Serving.

The core idea is that all nodes can request embeddings generation, but the actual worker processes will only run on the GPU nodes.

I’m looking for a solution that combines a distributed registry with something like Poolboy, which would allow for round-robin or other load balancing on the registered processes. I’ve looked into HordeRegistry, but it seems to be designed for single processes only.

I have some ideas for building a custom solution from scratch, but I have a feeling that my requirements—such as service discovery and load distribution—might be met with an off-the-shelf solution.

Do you have any pointers?

Thanks,

Udo

1 Like

I’d default to Oban, and run some queues only on selected nodes.

2 Likes

In my opinion, the main limitation is that we need embedding generation to be available as a function call. However, you bring up a valid point—having dedicated queues specifically for the GPU nodes and wrapping the job generation in function calls that wait for job completion could be a solution.

Check out Relay then

Didn’t know about the Relay Plugin. Maybe going ObanPro is the way then …

it’s pretty good. Make sure the latency is acceptable for your use case. The performance should be there but sometimes there’s a little bit of latency before the job starts executing.

Horde could also work. I used it for similar use case with not Horde.Registry but Horde.DynamicSupervisor — Horde v0.9.0 for starting supervised processes and Horde.Registry — Horde v0.9.0 for their name registration and lookup.

In my experience Oban approach will be easier to get working right, without weird bugs.

1 Like

Regarding Horde.DynamicSupervisor, I initially explored it but decided against using it because all the distribution strategies I encountered aimed to distribute tasks among all nodes. However, I just realized that by implementing my own strategy using the Horde.DistributionStrategy behavior to distribute tasks specifically among GPU nodes, I might achieve what I need. This sounds intriguing—thanks for the suggestion!

2 Likes

Not to intentionally derail your efforts but just come up with another idea. I have not used Flame but it kinda seems to fit your use case. I don’t know if there are backends that would suit you, however, and I never used it. Have a look at least:

1 Like

Feel free to derail as much as you want; my line of thinking was already stuck.

Yes, I did consider FLAME, but I ultimately ruled it out because of the usage pattern. We need to generate embeddings consistently and predictably. My impression of FLAME is that it’s mainly designed for “spot” functions, where the startup time is minimal compared to the overall runtime.

2 Likes

sir, please do not encourage me. I ended up discussing medieval literature and Don Quixote in particular the last time. On this forum.

6 Likes

I think you could get it done with a Ra state machine. The state machine could straightforwardly be the distributed registry. You can add in the load balancer piece via custom logic in the query path.

There’s an Elixir project that I’ve not tried yet but it’s a long this same vein. It does the registry part, but not sure if it currently does the load balancing part.

Similar to serverless frameworks/services there is a minimum that you can set, such that you always have a certain number of nodes running.

I also have not used it in production but used it to do some testing of the viability for running some mildly resource intensive tasks. We didn’t end up using it because there was/is not a GCP backend and we didn’t want to add k8s.

We didn’t end up using Oban Pro’s Relay for our particular use case and instead used a Workflow, and PubSub, and then tracked the completion and returns of the workflow steps in the caller and listened for incremental messages on the subscription. Not super elegant, but got the job done of easily allowing multiple people to track the progress.

There’s Oban.Pro.Plugins.DynamicScaler Oban.Pro.Plugins.DynamicScaler — Oban Pro v1.6.0-rc.3 that works with the Oban.Pro.Cloud behaviour.
That might be your best approach for it you use Oban.Pro and the Relay approach.

If you don’t go the Oban Pro route (which I would still suggest. “Boring” architecture is reliable and maintainable architecture) I would steer towards a newer registry approach. I have not used Ra.Registry, but have been following ra for a long time and am very impressed.

I’ve used Horde a great deal, and although it does a great deal very well, once you get into writing your own distribution strategy, you’re going to want a way to reliably test it, and depending on how you do your testing Horde has some disadvantages in that it expects a Horde.DynamicSupervisor’s name to be able to be turned into an atom.

It made my testing a little difficult and because of that I’ve usually tried something else first. It’s still a phenomenal library though. I tried to figure out a way to fix it the issue as well, but I never even got to the point where my fix was working let alone worthy of a pull request upstream.

2 Likes

I concur, before I’d even try something like Horde I’d go with Oban Pro.

2 Likes

As much as I love fault-tolerant consensus (and you know I do), I don’t think this is really necessary here. If you want exactly-once semantics on the queue you would have to store all of your data in Ra (which will certainly never scale), and process registries are a use-case where I’m not entirely convinced you actually want strong consistency guarantees - because where you really want those guarantees is on the database the processes are writing to. Once the database guarantees correctness the process registry and such only needs good performance (it should be right most of the time), so you don’t really need consensus anymore.

(I have had the misfortune of thinking about this a lot lately.)

But here the data is already stored in Postgres! So if you just put your work queue in Postgres you get transactions over both and you are totally safe correctness-wise (read committed footguns notwithstanding).

Like you could literally just do this:

def insert_data(text) do
  %Data{id: id} = Repo.insert!(%Data{text: text})
  Repo.insert! %EmbedJob{data_id: id, lease: nil}
end

def pop_job do
  expired = DateTime.utc_now() |> DateTime.add(-1, :minute)
  Repo.transaction(fn ->
    job = Repo.one!(from j in EmbedJob,
      where: is_nil(j.lease) or j.lease < ^expired,
      order_by: :inserted_at, limit: 1, lock: "for update", preload: :data)
    Repo.update! change(job, %{lease: DateTime.utc_now()})
  end)
end

def worker do
  {:ok, job} = pop_job()
  vector = embed_text(job.data.text)
  Repo.transaction(fn ->
    %Job{} = Repo.one!(from j in EmbedJob, where: j.id == ^job.id, lock: "for update")
    Repo.update! change(job.data, %{vector: vector})
    Repo.delete! job
  end)
end

And now you don’t need a process registry at all!

3 Likes

Just my two cents, but I came across a number of weird issues with Horde - like processes existing multiple times in the cluster, and many people reporting the same issues… :global ended up being good enough for my use case. Something as foundational as Horde must be rock-solid, otherwise you’ll waste a lot of time.

1 Like

BTW if anyone is actually doing this, it’s best to use FOR UPDATE SKIP LOCKED to improve concurrency. There are some nice notes in this blog post. I know some of the Sequin guys might be lurking around here, so if you’re reading this: you guys have a great blog! Keep it up :slight_smile:

1 Like

yep I did that too, worked like a charm. Oban uses that internally too.

1 Like

That makes sense. To be clear, I believe FOR UPDATE is enough for correctness, SKIP LOCKED would just have better performance because the clients can pop a bit faster since they aren’t blocked by each other for a short time.

1 Like

I mean, we had issues like that too but this was probably due to the CRDTs being what they are. :global is much more predictable if you’re having a solid connection between your nodes (we did not have) that you can rely on. I don’t think it’s meant to be rock-solid the way you’re expecting it to be, there’s always going to be a chance that the same process will register on two nodes, and the idea is that eventually the conflict will be detected and you can handle it, for example.

We did not remove it because it was not working properly, we removed it because the use case we had for it was no longer there. Also, we built everything new on Oban, and the original use case would be too built on Oban if we were to do it all over again.

The problem was that such conflicts would not be detected, and there is an open bug from 2020 - Removing nodes creates multiple processes out of 1 process · Issue #207 · derekkraan/horde · GitHub - that is still not addressed, nor are any public workarounds available. So it’s a nice idea, but it does not address the common use case of “I want to make sure that all processes are up and (reasonably) unique across the cluster, no matter what happens to cluster topology”.

1 Like