Streaming from a redis channel through a phoenix channel

Hi,

I am currently trying to replace action-cable with a small phoenix app. What I need to do is get information from a Redis channel and stream it to an ember client. I have been attempting to use Redix.PuSub and the Phoenix Redis adapter but havn’t been able to fully cover the functionality we currently have.

The current functionality works like this :

  • Our server receives a request from a user and logs some output to a Redis channel
  • That channels name is a combination of a string and a key
  • The Ember client then makes a request to action-cable with the same key
  • Action-cable then streams the logged information from the Redis channel with the same name

What I need to know is how to start listening to a Redis channel with a given name when the user makes a request & stream that information continuously to the client. I’ve managed to get one or the other using but not both.

I’ve been banging my head of this for over a day now so any help at all is massively appreciated.

Cheers

I do not use Redis (and I’m not sure why redis should be used in this situation as phoenix pubsub can completely replace it, more efficiently and faster), but do what is the code that you are trying to use to set it up so we can compare it to the docs? :slight_smile:

Also, have you thought about not using Redis? It just adds an additional complexity to this that is entirely unneeded for Phoenix servers as it already has better functionality built in.

I’m confused. If you’re trying to replace action cable entirely why bother with redis at all?

1 Like

To be clear, it’s relatively easy to do what you’re asking. You’d just need to have one of the redis clients that supports redis subscriptions. Then you just call MyApp.Endpoint.broadcast! with the channel you want. However I’m confused on why you’re trying to do this right now.

So just real quick redis is currently used as a communication layer between two other services, one of which is the rails app. Since both of those services are going to continue using the Redis server for the time being it makes sense to leave them as is for now.

As for the code I had something similar to the following when I was using redix_pubsub in order to open the correct channel and broadcast the information from it.

defmodule TpPhoenix.LogTailChannel do
  use Phoenix.Channel
  require Logger

  def join(stream_name, _message, socket) do
    # Open a link to the redis server
    {:ok, pubsub} = Redix.PubSub.start_link()

    # Subscribe to the users stream
    Redix.PubSub.subscribe(pubsub, stream_name, self())

    send(self, :setup_listeners)

    {:ok, socket}
  end

  def handle_info(:setup_listeners, socket) do
    receive do
      {:redix_pubsub, redix_pid, :message, %{channel: channel, payload: bin_msg}} ->
        broadcast! socket, channel, %{"message" => bin_msg}
    end

    {:noreply, socket}
  end

  def handle_in(stream_name, %{"message" => message}, socket) do
    broadcast! socket, stream_name, %{"message" => message}
      {:noreply, socket}
  end

  def handle_out(stream_name, payload, socket) do
    push socket, "live_log_" <> stream_name, payload
    {:noreply, socket}
  end
end

The issue I had below is the receive do would only fire the one time because, obviously, the user only joins the channel once. Also the handle_info functions don’t actually process any of the Redis information themselves.

In order to make this useful I need to continuously stream information from the redis channel.

To handle the message coming from Redis PubSub, you need to implement a handle_info matching what you’d expect coming from the Redix process.

def handle_info({:redix_pubsub, :message, message, redis_channel}, socket) do
  # do something with the message
  {:no_reply, socket}
end

So your saying I can declare a set of handle_info functions inside the channel which will match the information being broadcast from Redis?

From what I’ve trie using the Phoenix_PubSub_Redis adapter provides the following functions:

  def handle_info({:redix_pubsub, redix_pid, :subscribed, _}, %{redix_pid: redix_pid} = state) do
    {:noreply, state}
  end

  def handle_info({:redix_pubsub, redix_pid, :message, %{payload: bin_msg}}, %{redix_pid: redix_pid} = state) do
    {_vsn, remote_node_ref, fastlane, pool_size, from_pid, topic, msg} = :erlang.binary_to_term(bin_msg)

    if remote_node_ref == state.node_ref do
      Local.broadcast(fastlane, state.server_name, pool_size, from_pid, topic, msg)
    else
      Local.broadcast(fastlane, state.server_name, pool_size, :none, topic, msg)
    end

    {:noreply, state}
  end

These will catch any information flowing over the selected channel. In this case its phx:Elixir.TpPhoenix.PubSub. However this doesn’t allow me to choose the channel to listen to based on the key the user sent in.

Similarly Redix_PubSub uses the recieve do to match the message format in Redis.

I’m not quite sure if I am correctly understanding what you’re trying to do so please correct me if I’m wrong. You are wanting a user to join a channel with a given topic, phx:Elixir.TpPhoenix.PubSub for example, and want that user to be able to listen to messages being published to Redis PubSub with that same channel/topic name.

Ultimately, the channel code is just a GenServer with some extra sugar on top. Redix PubSub subscribes a PID to a stream of messages and you can handle messages sent to PIDs in a GenServer with handle_info. So the code I gave earlier was only the part of handling the message coming from Redix. You’d do the broadcast to your channel topic inside that method.

Here’s how it should look:

defmodule TpPhoenix.LogTailChannel do
  use Phoenix.Channel
  require Logger

  def join(stream_name, _message, socket) do
    # Open a link to the redis server
    {:ok, pubsub} = Redix.PubSub.start_link()

    # Subscribe to the users stream
    Redix.PubSub.subscribe(pubsub, stream_name, self())

    {:ok, socket}
  end

  # Handle the message coming from the Redis PubSub channel
  def handle_info({:redix_pubsub, :message, message, redis_channel}, socket) do
    # do something with the message

    # Push the message back to the user over the channel topic
    # This assumes the message is already in a map
    broadcast! socket, "live_log_#{redis_channel}", message
    
    {:no_reply, socket}
  end
end

Keep in mind that this example only accounts for using PIDs. The user may lose connection at some point and reconnect again with a different PID. In my use-case where I do something similar to what you’re trying to accomplish, I have a separate process managing the subscriptions where I keep the mapping Phoenix Channel to Redis PubSub Channel. That process broadcasts updates to the channel topic rather than relying on socket PIDs.

2 Likes

Thanks @alexgaribay that really helped me get my mind around it. It only took a little tweaking to get that working. I now have a few channels flowing with info from Redis. :slight_smile: .

With regards to the PIDs, is that an issue of phoenix not closing the connection or are you just saying we can’t rely on it for tracking? If its tracking thats fine, we don’t need to track their sessions at the moment.

Thanks again for the help its massively appreciated :smile:

Glad to hear you got it working.

I was speaking about tracking. For your use-case and how you’re using PIDs, I don’t think it is something you need to worry about. Just keep in mind that if a client loses connection, it’s likely a new process will be spawned and it will have a different PID. You’re subscribing the PID when a client joins, which is something I can’t do for my use-case, so you should be alright.

1 Like