What is the proper way to update LiveView from a globally limited process

Hi all

Context
I am learning Elixir, Phoenix and LiveView by making a hobby project on yet another ChatGPT wrapper. I’ve got to a point where I feel comfortable updating LiveView the way I want and now need to figure the how to manage and limit access to OpenAI. I have also used poolboy for resource management on another small project (where poolboy workers were used for contacting a constantly running Python process synchronously).

Where I want to apply limits
OpenAI API has limits for concurrency (at least on some models) and I want to apply limits on how many requests are in parallel and per minute (and also per day/month as it’s a hobby or half-hobby project, so I need a hard limit not to get broke suddenly).

Possible architecture
I am thinking about creating an API module with rate limiter such as ExRated (probably for start it will just immediately reject if we are over one of rate limit).

Then it would grab one of e.g. 5 workers controller by poolboy and return pass LiveView request to it (basically a set of conversation messages between user and bot). A worker would use something like openai_ex to contact open ai and pass stream of updates back to LiveView.

The problem
With my not too high elixir skills I have issues figuring how to do interprocess communication. I am thinking about the following interactions to be possible:

  • A. LiveView shall be able to pass initial request to OpenAI worker
  • B. Worker shall be able to update LiveView many times as chat continuation tokens are received
  • C. LiveView shall be able to cancel the stream, because e.g. user decided he doesn’t want to continue this chat

Point A seems to be easy as I have already made a functional poolboy worker in the past and pretty much along the lines of this guide I can pass to transaction() whichever Map I like, but with points points B and C, I am not sure what’s the proper way to establish communication between a pool worker and a LiveView process.

  • Shall I pass LiveView process pid to worker, so it can send “some more words received from Open AI” messages via GenServer.cast to be handled via LiveView’s handle_cast?
    • And first such message to contain worker’s pid, so LiveView could cancel via same GenServer.cast?
  • Or am I better arrange a Phoenix PubSub communications with topics such as “openai_worker_1_commands” and “openai_worker_1_responses” so worker and LiveView processes would subscribe to them on initialization and then update each other?
  • Or should I use something completely different? Like maybe there are even ready-made libraries for such cases?

As it’s my first real case of asynchronous interprocess elixir communication I am not sure where to start from? :slight_smile:
What would you do?

Have you come across GenStage? It’s useful abstraction for backpressure and rate limiting.

While it’s totally fine to be asking here I’d also encourage not trying to build a good solution, but try to build “a” solution. The latter is much easier to get going with and you’ll bump into more concrete things. The problem described in this thread is very open ended with many possible solutions.

There’s like a handful of pooling libraries, there’s various rate limiting tools, pubsub is not inherently better or worse than other schemes of message passing.

I think the biggest question to answer outside of tooling will be how you model batching of requests to workers and than routing responses back out by conversation again.

Just to let you know (as I guess it’s good to tell where one helps) I figured that I am possibly doing a bit too much upfront thinking indeed, and went more to the doing side (though still asking questions certainly).

Specifically in this case (just in case somebody happens to be curious enough) I went for wrapping up poolboy with our GenServer. This wrapper checks out a worker which is supposed to checkin itself back when it’s not needed anymore. And then wrapper sends messages to itself after some timeout, to reset the workers that happen to be not checkin back in on time.

For messaging I pass final client’s pid (live view pid) to wrapper, so it knows who to notify.