How to use Phoenix 1.7 Streams on LiveComponent preload?

The recent Phoenix 1.7 came with Streams. The example given in the blog post was straightforward and easy to understand.

However, how would you use them when preloading a LiveComponent?

Given the example about preloading from the docs:

def preload(list_of_assigns) do
  list_of_ids = Enum.map(list_of_assigns, & &1.id)

  users =
    from(u in User, where: u.id in ^list_of_ids, select: {u.id, u})
    |> Repo.all()
    |> Map.new()

  Enum.map(list_of_assigns, fn assigns ->
    Map.put(assigns, :user, users[assigns.id])
  end)
end

Would it be possible to do it in the Map.put() section? Is it even possible to do streams when preloading?

2 Likes

The preload happens outside of the LiveComponent. You don’t really have access to the socket there. So for streams in live components, you can define it on mount, prepare the assigns on preload, and then load them into the stream on update.

5 Likes

@josevalim Is there an example for the scenario you’re describing? Sorry, I know this might seem basic, but I’m pretty new to LiveView and my google-fu has failed me.

The main point is that preload does not know about streams. All the data in preload is copied to the socket in the update/2 callback. So that would be the place to insert data into the stream. If you give it a try, I can gladly review and provide pointers.

7 Likes

This preloading example from the docs demonstrates how to avoid an n+1 query, which the stream example from the blog posts accomplishes by calling Blog.list_posts() that likely looks something like def list_posts(), do: Repo.all(MyApp.Post).

So in this case, preloading database resources into socket assigns may well be unnecessary/superfluous since these resources have already been added into the socket streams. And since one of the big advantages of streams is to offload storing of collections in memory on the server, I would think twice about about preloading and storing these same collections in memory on the server via socket assigns when using streams.

I haven’t tested it, but this could be one way of passing streams into multiple LiveComponents via assigns.

defmodule DemoWeb.PostLive.Index do
  use DemoWeb, :live_view

  alias Demo.Blog
  alias Demo.Blog.Post

  @impl true
  def mount(_params, _session, socket) do
    {:ok, stream(socket, :posts, Blog.list_posts())}
  end

  ...
end
# index.html.heex
<%= for post <- @streams.posts do %>
  <.live_component module={PostComponent} id={post.id} post={post} />
<% end %>

On another note, the following section of the Phoenix.LiveComponent docs on Preloading and update might help in understanding what @josevalim is saying.

So on first render, the following callbacks will be invoked:

preload(list_of_assigns) -> mount(socket) -> update(assigns, socket) -> render(assigns)

On subsequent renders, these callbacks will be invoked:

preload(list_of_assigns) -> update(assigns, socket) -> render(assigns)

Since you need the socket to pass into the stream, stream_insert, or stream_delete functions, only the mount and update LiveComponent callbacks can interact with streams.

4 Likes

Thanks for clarifying!

@codeanpeace Thank you for elaborating Jose’s point further. I definitely have a better understanding of his explanation now.

2 Likes

I just gave some pointers, @codeanpeace gave the proper and complete answer. :slight_smile:

3 Likes

Apologies for the follow-up question. I had another crack at it, but it seems there are still some things that are not clear to me yet.

Let’s say I have 2 tables - Post and Tag.

If I want to have a LiveView where it displays all of the Posts along with its tags, what would be the best way to manage the state for those if I want to use streams?

Do I do a call to all posts and load their related tags and put them in a stream inside the LiveView like so?

@impl true
def mount(_params, _session, socket) do
  {:ok, stream(socket, :posts, Repo.all(Post) |> Repo.preload([:tags]))}
end

I feel like the code above is a bad idea because I would also want to be able to track each post’s tags, so I want to ask what the best approach would be in this scenario.

Other scenarios that can happen within the same LiveView:

  1. If another user adds a new tag to a post, the new tag should appear under the post through Phoenix.PubSub
  2. If another user edits an existing tag, the changes should reflect on the same LiveView under the post it belongs to through PubSub as well.

How would I set up the LiveView to support those scenarios? How many LiveComponents should I make?

Hmm, those are interesting scenarios that may push the current streaming functionality to the limit. I’m also interested in hearing what others have to say too when working with streamed nested resources, but here are some off the cuff approaches that come to mind…

So from what I see in the docs, the stream_insert function can also update an existing item in the stream based on the resource id. One approach for the first scenario would be to broadcast post change events when a tag gets added/removed and then update the “streamed” post with stream_insert. The Managing State section covers this approach, just with updating assigns rather than streams.

The second scenario is definitely trickier depending on the requirements – if posts have a many to many relationship with tags, editing a tag on one post may require other posts that share those tags to be updated. And since, unlike with assigns, we can’t use Kernel.update_in for dealing with nested data, it might just be more sane to re-stream the preloaded posts entirely then to try and selectively update nested resources.

defmodule DemoWeb.PostLive.Index do
  use DemoWeb, :live_view
  ...

  def mount(_params, _session, socket) do
    preloaded_posts = Repo.all(Post) |> Repo.preload([:tags])
    # set up LiveView to listen for changes to its posts and tags
    # even changes made by other users
    for %Post{id: id, tags: tags} <- preloaded_posts do
      MyApp.Endpoint.subscribe("post:#{id}")
      for %Tag{id: id} <- tags, do: MyApp.Endpoint.subscribe("tag:#{id}")
    end
    {:ok, stream(socket, :posts, preloaded_posts)}
  end

  # message from PostComponent/PostFormComponent about post with a new tag sent via
  # MyApp.Endpoint.broadcast("post:#{id}", {:updated_post, post})
  # rather than `send(self(), {:updated_post, post})` so other users' LiveViews are notified
  def handle_info({:updated_post, post}, socket) do
    {:noreply, stream_insert(socket, :posts, post)}
  end

  # message from TagComponent/TagFormComponent about updated tag sent via
  # MyApp.Endpoint.broadcast("tag:#{id}", {:updated_tag, tag})
  # rather than `send(self(), {:updated_tag, tag})` so other users' LiveViews are notified
  def handle_info({:updated_tag, _tag}, socket) do
    preloaded_posts = Repo.all(Post) |> Repo.preload([:tags])
    {:noreply, stream(socket, :posts, preloaded_post)}
  end
  ...
end
# index.html.heex
<div phx-update="stream">
  <%= for post <- @streams.posts do %>
    <.live_component module={PostComponent} id={post.id} post={post} />
  <% end %>
</div>

# post_component.html.heex
<%= for tag <- @post.tags do %>
    <.live_component module={TagComponent} id={tag.id} tag={tag} />
<% end %>
1 Like

Thank you so much for this! I didn’t realize I was asking for too much out of streams, but it makes sense since it just came out recently.

Yeah, it’s definitely still early days for streams. There’s some interesting discussion in these posts on Phoenix 1.7 streams and sortable datatables and Phoenix LiveView Stream API for inserting many if you wanted to get a peek at what’s on the horizon.

Turns out calling stream on an existing key likely won’t teardown and re-stream as I had assumed based off of how assign works. That said, it does look like emptying a stream will be added soon.