How to delete a specific stream from a socket?

Trying to drop a specific stream from a socket in LV via.

{_, socket} =
    pop_in(socket.assigns.streams[filter_string])

{_, socket} =
    pop_in(socket.assigns.streams.__configured__[filter_string])

{:noreply, socket}

But get

[error] GenServer #PID<0.1919.0> terminating
** (KeyError) key "test" not found in: %{__changed__: MapSet.new([]), __configured__: %{}, __ref__: 1}
    :erlang.map_get("test", %{__changed__: MapSet.new([]), __configured__: %{}, __ref__: 1})
    (elixir 1.18.3) lib/map.ex:318: Map.update!/3
    (phoenix_live_view 1.0.9) lib/phoenix_component.ex:1447: Phoenix.Component.update/3
    (phoenix_live_view 1.0.9) lib/phoenix_live_view/lifecycle.ex:223: anonymous fn/2 in Phoenix.LiveView.Lifecycle.after_render/1
    (phoenix_live_view 1.0.9) lib/phoenix_live_view/lifecycle.ex:237: Phoenix.LiveView.Lifecycle.reduce_socket/3
    (phoenix_live_view 1.0.9) lib/phoenix_live_view/lifecycle.ex:222: Phoenix.LiveView.Lifecycle.after_render/1
    (phoenix_live_view 1.0.9) lib/phoenix_live_view/channel.ex:992: anonymous fn/4 in Phoenix.LiveView.Channel.render_diff/3
    (telemetry 1.3.0) /home/dead/Dev/gale/deps/telemetry/src/telemetry.erl:324: :telemetry.span/3
    (phoenix_live_view 1.0.9) lib/phoenix_live_view/channel.ex:983: Phoenix.LiveView.Channel.render_diff/3
    (phoenix_live_view 1.0.9) lib/phoenix_live_view/channel.ex:819: Phoenix.LiveView.Channel.handle_changed/4
    (stdlib 6.2.2) gen_server.erl:2345: :gen_server.try_handle_info/3
    (stdlib 6.2.2) gen_server.erl:2433: :gen_server.handle_msg/6
    (stdlib 6.2.2) proc_lib.erl:329: :proc_lib.init_p_do_apply/3
Last message: %Phoenix.Socket.Message{topic: "lv:phx-GDKJG9wvY7sfv3iB", event: "event", payload: %{"event" => "delete_filter", "type" => "click", "value" => %{"filter" => "test"}}, ref: "17", join_ref: "4"}
State: %{socket: #Phoenix.LiveView.Socket<id: "phx-GDKJG9wvY7sfv3iB", endpoint: GaleWeb.Endpoint, view: GaleWeb.HomePageLive, parent_pid: nil, root_pid: #PID<0.1919.0>, router: GaleWeb.Router, assigns: %{filters: ["test"], __changed__: %{}, current_user: #Gale.Users.User<__meta__: #Ecto.Schema.Metadata<:loaded, "users">, id: "f08b5683-2009-4b58-8a09-105e2fddb5d0", email: "asdf@asdf.com", confirmed_at: nil, inserted_at: ~U[2025-04-01 01:47:38Z], updated_at: ~U[2025-04-01 01:47:38Z], ...>, flash: %{}, filter_form: %Phoenix.HTML.Form{source: #Ecto.Changeset<action: nil, changes: %{}, errors: [], data: %{}, valid?: true, ...>, impl: Phoenix.HTML.FormData.Ecto.Changeset, id: "form", name: "form", data: %{}, action: nil, hidden: [], params: %{}, errors: [], options: [method: "post"], index: nil}, streams: %{:__changed__ => MapSet.new([]), :__configured__ => %{"test" => [dom_id: #Function<0.13511000/1 in GaleWeb.HomePageLive.handle_event/3>]}, :__ref__ => 1, "test" => %Phoenix.LiveView.LiveStream{name: "test", dom_id: #Function<0.13511000/1 in GaleWeb.HomePageLive.handle_event/3>, ref: "0", inserts: [], deletes: [], reset?: false, consumable?: false}}, live_action: :home}, transport_pid: #PID<0.1908.0>, ...>, components: {%{}, %{}, 1}, topic: "lv:phx-GDKJG9wvY7sfv3iB", serializer: Phoenix.Socket.V2.JSONSerializer, join_ref: "4", redirect_count: 0, upload_names: %{}, upload_pids: %{}}
[

I assume there is a better way?

1 Like

Maybe like this?

socket
|> stream(:mystream, [], reset: true)

But I don’t really understand what You are doing with your pop_in…

1 Like

This will give you a better idea of what I’m dealing with.

In short I would like to remove the condition from having to check if it already had a prior stream with that key since the prior config will cry if I try to do it twice but also. Removing the stream vs just emptying it.

I don’t think streams are meant to be used dynamically like this. The docs for stream_configure/3 specifically suggest calling it in mount/2 (i.e. statically).

What is the use case here? That is to say, what are the items in the streams?

this is the site https://gale.deadegos.org/
As you can see I build up filters of the bluesky jetstream feed.
I want to add remove them as I want.
Currently this works. though leaves a little cruft left over from “deleted” streams

I guess I could consolidate to one stream and just manage the one.

also I think the only reason it states that in the docs to put the config in mount is because it will error if you try to config twice, thus you will need a condition

I see, I often caution people about over-using streams but this is about as good a use-case as I can think of :slight_smile:

From what I can tell the cause of the error you’re getting is that assigns.streams[name] is only populated when you actually call stream/4, if I had to guess probably only within that render before it’s cleared. While the key in assigns.streams.__configured__, I assume, lasts the duration of the LiveView.

I think (I did not bother to test) that pop_in/1 does not handle the missing key, but pop_in/2 does. There is some language in the docs for pop_in/1 suggesting the “complete path must be visible to the macro”.

Does this work?

pop_in(socket, [:assigns, :streams, filter_string])

I was just using that line from the docs to demonstrate that the streams APIs are not meant to be used dynamically. Obviously you technically can, as you have demonstrated, but there is no API to clean up the dangling streams (as you have discovered).

BTW, even if you remove the stream from the server correctly there will still be some accounting left on the client to handle the streams, and if you re-use a name I would imagine you could break something in a weird way.

Consider making a feature request for better dynamic functionality.

Another approach: you could instead make a “pool” of streams (configured in mount/2) and then dynamically assign filters to them, resetting them when the filter changes. I think this would work fine because obviously there are only so many streams you can display at once on a screen anyway.

Yeah this is what I meant by consolidating to just a single long lived stream.
IE multi dimensional list

What I mean is you can allocate multiple streams, like:

socket
|> stream_configure(:stream_1, ...)
|> stream_configure(:stream_2, ...)
|> assign(:filters, %{"test" => :stream_1}, "foo" => :stream_2})

And then update :filters when filters are added/removed, resetting the streams as needed.

By pooling the streams and re-using them you are working around the lack of delete functionality.

yeah we are saying the same thing.

Though I fear this may has a unexpected side effect

You can close the socket. You can’t remove streams. A multiplex socket is exactly that: a single socket with multiple streams coming through it.