I’m using the Anthropix library, which can stream the response from the API. When using the stream option, you pass the PID of the process, and it sends all response messages to that process.
I need to display the messages in a LiveVIew app, so I did something like:
# In my LiveView module
def handle_info(:start_generation, socket) do #
MyApp.Anthropic.generate(self())
{:noreply, socket}
end
def handle_info({pid, {:data, data}}, socket) when is_pid(pid) do
text = case data do
...
end
{:noreply, assign(socket, text: text)}
end
# Needed because Anthropix.chat sends all messages to the process, including :DOWN etc.
def handle_info(_msg, socket), do: {:noreply, socket}
# In the MyApp.Anthropic module
def generate(liveview_pid) do
client = Anthropix.init()
Antropix.chat(client, ..., stream: liveview_pid)
end
This works and the message is shown in the LiveView as it gets updated. However, when the message is fully generated, I want to cache it inside of Cachex.
I can create a separate cache
function inside of MyApp.Anthropic
and call it from the liveview when it receives a message about completed generation, but I would prefer to have as little business logic in the LiveView as possible, so I want to automatically handle caching from the MyApp.Anthropic
module.
How can I handle the incoming messages both in the LiveView and in the other module? I can’t use a receive
since it’ll block the execution of the MyApp.Anthropic.generate/1
function and the docs say to avoid using receive
inside of a GenServer. I also can’t use a Task.async
with a receive
inside of it, because I get the error <Task> must be queried from the owner but was queried from <PID>
.