I am currently trying to replace action-cable with a small phoenix app. What I need to do is get information from a Redis channel and stream it to an ember client. I have been attempting to use Redix.PuSub and the Phoenix Redis adapter but havn’t been able to fully cover the functionality we currently have.
The current functionality works like this :
Our server receives a request from a user and logs some output to a Redis channel
That channels name is a combination of a string and a key
The Ember client then makes a request to action-cable with the same key
Action-cable then streams the logged information from the Redis channel with the same name
What I need to know is how to start listening to a Redis channel with a given name when the user makes a request & stream that information continuously to the client. I’ve managed to get one or the other using but not both.
I’ve been banging my head of this for over a day now so any help at all is massively appreciated.
I do not use Redis (and I’m not sure why redis should be used in this situation as phoenix pubsub can completely replace it, more efficiently and faster), but do what is the code that you are trying to use to set it up so we can compare it to the docs?
Also, have you thought about not using Redis? It just adds an additional complexity to this that is entirely unneeded for Phoenix servers as it already has better functionality built in.
To be clear, it’s relatively easy to do what you’re asking. You’d just need to have one of the redis clients that supports redis subscriptions. Then you just call MyApp.Endpoint.broadcast! with the channel you want. However I’m confused on why you’re trying to do this right now.
So just real quick redis is currently used as a communication layer between two other services, one of which is the rails app. Since both of those services are going to continue using the Redis server for the time being it makes sense to leave them as is for now.
As for the code I had something similar to the following when I was using redix_pubsub in order to open the correct channel and broadcast the information from it.
defmodule TpPhoenix.LogTailChannel do
use Phoenix.Channel
require Logger
def join(stream_name, _message, socket) do
# Open a link to the redis server
{:ok, pubsub} = Redix.PubSub.start_link()
# Subscribe to the users stream
Redix.PubSub.subscribe(pubsub, stream_name, self())
send(self, :setup_listeners)
{:ok, socket}
end
def handle_info(:setup_listeners, socket) do
receive do
{:redix_pubsub, redix_pid, :message, %{channel: channel, payload: bin_msg}} ->
broadcast! socket, channel, %{"message" => bin_msg}
end
{:noreply, socket}
end
def handle_in(stream_name, %{"message" => message}, socket) do
broadcast! socket, stream_name, %{"message" => message}
{:noreply, socket}
end
def handle_out(stream_name, payload, socket) do
push socket, "live_log_" <> stream_name, payload
{:noreply, socket}
end
end
The issue I had below is the receive do would only fire the one time because, obviously, the user only joins the channel once. Also the handle_info functions don’t actually process any of the Redis information themselves.
In order to make this useful I need to continuously stream information from the redis channel.
So your saying I can declare a set of handle_info functions inside the channel which will match the information being broadcast from Redis?
From what I’ve trie using the Phoenix_PubSub_Redis adapter provides the following functions:
def handle_info({:redix_pubsub, redix_pid, :subscribed, _}, %{redix_pid: redix_pid} = state) do
{:noreply, state}
end
def handle_info({:redix_pubsub, redix_pid, :message, %{payload: bin_msg}}, %{redix_pid: redix_pid} = state) do
{_vsn, remote_node_ref, fastlane, pool_size, from_pid, topic, msg} = :erlang.binary_to_term(bin_msg)
if remote_node_ref == state.node_ref do
Local.broadcast(fastlane, state.server_name, pool_size, from_pid, topic, msg)
else
Local.broadcast(fastlane, state.server_name, pool_size, :none, topic, msg)
end
{:noreply, state}
end
These will catch any information flowing over the selected channel. In this case its phx:Elixir.TpPhoenix.PubSub. However this doesn’t allow me to choose the channel to listen to based on the key the user sent in.
Similarly Redix_PubSub uses the recieve do to match the message format in Redis.
I’m not quite sure if I am correctly understanding what you’re trying to do so please correct me if I’m wrong. You are wanting a user to join a channel with a given topic, phx:Elixir.TpPhoenix.PubSub for example, and want that user to be able to listen to messages being published to Redis PubSub with that same channel/topic name.
Ultimately, the channel code is just a GenServer with some extra sugar on top. Redix PubSub subscribes a PID to a stream of messages and you can handle messages sent to PIDs in a GenServer with handle_info. So the code I gave earlier was only the part of handling the message coming from Redix. You’d do the broadcast to your channel topic inside that method.
Here’s how it should look:
defmodule TpPhoenix.LogTailChannel do
use Phoenix.Channel
require Logger
def join(stream_name, _message, socket) do
# Open a link to the redis server
{:ok, pubsub} = Redix.PubSub.start_link()
# Subscribe to the users stream
Redix.PubSub.subscribe(pubsub, stream_name, self())
{:ok, socket}
end
# Handle the message coming from the Redis PubSub channel
def handle_info({:redix_pubsub, :message, message, redis_channel}, socket) do
# do something with the message
# Push the message back to the user over the channel topic
# This assumes the message is already in a map
broadcast! socket, "live_log_#{redis_channel}", message
{:no_reply, socket}
end
end
Keep in mind that this example only accounts for using PIDs. The user may lose connection at some point and reconnect again with a different PID. In my use-case where I do something similar to what you’re trying to accomplish, I have a separate process managing the subscriptions where I keep the mapping Phoenix Channel to Redis PubSub Channel. That process broadcasts updates to the channel topic rather than relying on socket PIDs.
Thanks @alexgaribay that really helped me get my mind around it. It only took a little tweaking to get that working. I now have a few channels flowing with info from Redis. .
With regards to the PIDs, is that an issue of phoenix not closing the connection or are you just saying we can’t rely on it for tracking? If its tracking thats fine, we don’t need to track their sessions at the moment.
Thanks again for the help its massively appreciated
I was speaking about tracking. For your use-case and how you’re using PIDs, I don’t think it is something you need to worry about. Just keep in mind that if a client loses connection, it’s likely a new process will be spawned and it will have a different PID. You’re subscribing the PID when a client joins, which is something I can’t do for my use-case, so you should be alright.