How can I read incoming messages in a non-blocking way?

I’m using the Anthropix library, which can stream the response from the API. When using the stream option, you pass the PID of the process, and it sends all response messages to that process.

I need to display the messages in a LiveVIew app, so I did something like:

# In my LiveView module
def handle_info(:start_generation, socket) do #
  MyApp.Anthropic.generate(self())
  {:noreply, socket}
end

def handle_info({pid, {:data, data}}, socket) when is_pid(pid) do
  text = case data do
    ...
  end
  {:noreply, assign(socket, text: text)}
end

# Needed because Anthropix.chat sends all messages to the process, including :DOWN etc.
def handle_info(_msg, socket), do: {:noreply, socket}
# In the MyApp.Anthropic module
def generate(liveview_pid) do
  client = Anthropix.init()
  Antropix.chat(client, ..., stream: liveview_pid)
end

This works and the message is shown in the LiveView as it gets updated. However, when the message is fully generated, I want to cache it inside of Cachex.

I can create a separate cache function inside of MyApp.Anthropic and call it from the liveview when it receives a message about completed generation, but I would prefer to have as little business logic in the LiveView as possible, so I want to automatically handle caching from the MyApp.Anthropic module.

How can I handle the incoming messages both in the LiveView and in the other module? I can’t use a receive since it’ll block the execution of the MyApp.Anthropic.generate/1 function and the docs say to avoid using receive inside of a GenServer. I also can’t use a Task.async with a receive inside of it, because I get the error <Task> must be queried from the owner but was queried from <PID>.

I’m not familiar with Anthropix, so I might be totally pointing in the wrong direction. However, in general I would say that you need to create a process that handles the request from LiveView. This would probably be a GenServer that handles the request from LiveView and then either receives it from the cache and sends it to LiveView or passes it on to Anthropix. Messages from Anthropix are send to the GenServer and both stored (to be cached) and passed on to LiveView.

This way your LiveView does not have to know anything about Anthropix and can just make requests and receives them either from the API or the cache.