Stream pagination and subscribe/unsubscribe to individual items

Hi,

I am paginating through a bunch of classes, displaying 10 at a time. Every time I change a page, I need to subscribe to the new classes count, and unsubscribe from those shown previously. (I want the count to live-update in the table, so I will be re-inserting the classes to the stream when update is broadcast)

The only thing I was able to make work is

unsubscribe (kinda workaround)>

Sorted.PubSub
    |> Registry.keys(self())
    |> Enum.map(fn topic ->
      Phoenix.PubSub.unsubscribe(Sorted.PubSub, topic)
    end)

and subscribe>

class_ids =
        socket.assigns.streams.classes.inserts
        # Convert stream to list eagerly
        |> Enum.to_list()
        # Extract IDs
        |> Enum.map(fn {_dom_id, _, class, _} -> class.id end)

      class_ids |> Enum.each(&Sorted.Pubsub.subscribe_class_count/1)

It works, but it kinda smells to me


I was hoping I could do something like this for both unsubscribe and subscribe

for {_dom_id, class} <- socket.assigns.streams.classes do
        Sorted.Pubsub.subscribe_class(class.id)
      end

but this is throwing

** (ArgumentError) streams can only be consumed directly by a for comprehension.
If you are attempting to consume the stream ahead of time, such as with
`Enum.with_index(@streams.classes)`, you need to place the relevant information
within the stream items instead.

On top of that, even with the above subscribe code, I was not able to unsubscribe before changing the page, as the list was always empty


So what’s the proper way to go about this? Thank you very much.

This feels a bit bend over backwards. Why put stuff into a stream and then try to get it out again. Give the code doing the subscribing the data directly independently from the stream. I think the unsubscribing actually makes sense given you don’t retain knowledge about the previous set of classes server side.

Yes, it kinda feels like maybe I should not use streams at all in this case. I was just under the impression, that basically streams should be the go-to solution for majority of use cases, shouldn’t they?

My rule of thumb for streams is large collections that are ideally more “static” in nature and don’t require too much intricate interaction with the server. It’s especially great for collections that keep on growing over time – logs, users in chatroom, etc. If you’re only displaying 10 at a time, streams may be overkill since it does add some complexity while reducing memory usage.

That said, bouncing off of what @LostKobrakai said, you could also track the bare minimum necessary for pubsub subscribe/unsubscribe as an assign e.g. current_class_ids and use that instead of reaching into the registry and streams. ¯\_(ツ)_/¯

1 Like

Making your subscription to granular can be problematic for several reasons, my thumb rule for building a complex real-time database management for Ecto is to subscribe to a table.

I should have made it clear, that I use stream, pubsub, counting and updating rows, logs etc.

1 Like