How do you manually send a client a message through their established Websock?

This is an Elixir question, not a Phoenix question. I am not interested in any PubSub or Channels solution, but rather how to do this with basic Elixir.

If you have a user who has established a WebSock connection as shown here in the EchoServer example or as shown in the MyAppWeb.ConnectionTimer example, how can you utilize this websocket to send them a message from a distinct Elixir process/service?

Both the conn and the WebSock behavior are not processes. They have no process ID. Thus they have no capacity to receive messages. They are just looping on state in reaction to user triggered handle_in stimulus.

1) ELIXIR SERVICE TO CLIENT MESSAGING (EASY)

One can envision a way the conn/Websock could be made to send out messages into Elixir in response to incoming triggers easily enough:

  • Create a Genserver on each new connection made and set the PID for this into the state of conn/WebSock on its initialization.
  • When a specific handle_in request is received, you can use Process.send to send that GenServer a message which it will then receive it under handle_info

Thus we can easily allow client → separate Elixir process system communication through the WebSock.

2) ELIXIR SERVICE TO CLIENT MESSAGING (UNCLEAR?)

But what about separate Elixir process → client WebSock communication? Where such communication is not triggered by the user’s initiation, but rather our system’s?

Let’s say for whatever reason that GenServer wants to send the client a message through their websock. How can it do so? How can any process do so?

My current guess based on the fact that in this example he is running:

  defp schedule_alert, do: Process.send_after(self(), :alert, :timer.seconds(15))

This suggests to me each running conn/WebSock while not a process itself is being run by and is associated with a parent process (direct Supervisor?) that is uniquely associated with it (one-to-one). So somewhere here there is a possible solution.

EXPERIMENT

I just tested debugging out the PID of the main supervisor that starts my router in Application.ex and also debugging out self() in the router itself under get / for simple localhost:port/ accesses. On every reload, a new PID is shown there, which is not the PID of the main application supervisor that started the router.

This suggests a new process is initiated on every new web request to the router. But what is this process, then, and how do we control it? Presumably it has the conn/Websock within its own state (or the conn/WebSock functions are augmenting or overriding its base functions) when the connection is upgraded so if we can control that or override its functions we can perhaps accomplish the goal?

Simpler: If we assume all conn/WebSock’s have their own unique one-to-one Supervisor PID running them given by self() within the WebSock code (which seems to be true), then we can message that PID just like in this timer websock to trigger the handle_info of the WebSock. Then based on the handle_info manually run inside that handle_in which has the capacity to message the user directly through the websock.

I am thinking that is the solution now as I work it through. I will try that next.

GOAL

I am looking for the basic Elixir solution to this. To pre-empt, yes I have tried to solve this already but can’t. I have four books on Elixir sitting on the couch next to me but none explain this. ChatGPT can’t seem to answer without telling me about Phoenix (which I’m not interested in and is not magic in any case and must follow the same laws as base Elixir).

There is obviously some way to do this that systems like Phoenix are utilizing. That’s what I want to understand.

Thanks for any help.

When the websocket process starts, have it register its PID with the key for the user id in a Registry that you’ve started. From another process, retrieve the PID from the Registry by looking up the user id and send a message to it.

https://hexdocs.pm/elixir/Registry.html#module-using-as-a-pubsub

Yes, I can do that.

But a few points:

  • As per the other thread, the WebSock/conn are not technically processes themselves.
  • Thus it is perhaps a Supervisor process or something that is running the process that encompasses them (technicality perhaps but worth understanding whatever the reality here is).
  • One can get the pid of this running process on init of the WebSock implementation and store or debug it.
  • For testing, one can in simple manner add IO.inspect (self()) to the WebSock somewhere to see what it is, and then in iex:
custom_pid = pid(0,88,0) #or whatever the values were outputted by the debug)
Process.send(custom_pid, "message", [])
  • This will successfully trigger handle_info of the WebSock implementation.
  • But it still doesn’t have any way to send the user a message through the websocket.

My theory was to then from the handle_info run handle_in with end result there of {:reply, :ok, {:text, to_string(message_in)}, state} with the hope this would push a message through the connected websocket.

But I have just tested this and can’t get anything to show up in the browser console from this method by testing. By contrast I can get reply messages back from handle_in when I send a message initiated by the client.

ie. From the client end (Javascript) if I do sock.send("ping") I can receive this in handle_in of WebSock and reply with whatever that shows back into the console where I have a listener set for messages. But I still cannot send a message from an Elixir process that will do this (ie. Elixir Process → client message).

I can’t seem to “force” a “reply” (ie. push a message to the client) from Elixir directly.

Should I be able to? Any further thoughts appreciated. Thanks.

TEST CONFIG

A test config can be easily made by copying and pasting the EchoServer configuration here, running the project locally, and then in F12 of FireFox or Chrome or whatever, running:

 sock = new WebSocket("ws://localhost:port/websocket");
        sock.addEventListener("message", (event) => console.log(event)); 
        sock.addEventListener("message", (event) => { console.log("Message from server: ", event.data); });
        sock.addEventListener("open", () => sock.send("ping")); //The open event is fired when a connection with a WebSocket is opened.

        // Send a keep-alive message every 30 seconds
        setInterval(() => { if (sock.readyState === WebSocket.OPEN) { sock.send("ping"); }}, 30000); 

If you configure handle_in to reply to “ping” you will get it back in the console. But the method I described above to try to force a message to the client from elsewhere in Elixir does not seem to work.

I still don’t know how to send a message directly to the client through the websocket from Elixir itself.

This is not correct as stated. conn is a datastructure and indeed therefore not a process.

But code on the beam cannot run outside a process. There’s always a process running the code. It could be that that you want a distinct process to the one you start out on in certain places for reasons like failure isolation and more, but you cannot run code without there being a process.

You can find the handle_info callback for the WebSock behaviour here: WebSock — WebSock v0.5.3

You can return {:push, messages(), state()} from handle_info to send a message to the websocket client in response to having handled a message. WebSock — WebSock v0.5.3

I’d strongly suggest looking at Registry to register the websock processes under a more descriptive key, which you can then use to send messages to it. This is also what is used within phoenix pubsub and further their channels implementation as described here: Matrjoschka of phoenix communication | Benjamin Milde

6 Likes

I also want to address this as well: Before phoenix 1.7 (the latest release at this point) websocket handling wasn’t abstracted, but handled directly through the lower level integration with cowboy. @mtrudel having built bandit was nice enough to build out all the new WebSock abstractions to make it possible to switch all of phoenix to bandit vs using cowboy as well as allowing for any future webserver implemenation to provide adapters for both http and websockets.

Hence probably all books you have would have been written before those abstractions were in place and also generally the lack of more hands on guides around it. People have been using channels for ages vs the WebSock abstraction being very new.

2 Likes

Yeah to elaborate on what @LostKobrakai said, the loop I described in the other thread is happening in a process, and that process is NOT the supervisor. It’s the process spawned by the supervisor but it is it’s own process. When you do self() you get the process ID of that specific process.

Notably you’re still using an abstraction here. If you want to actually do this with 100% built in Elixir you’d want something like Playing with Sockets and Processes in Elixir which is using a manual loop and direct management of the :gen_tcp library.

And this is where the history that @LostKobrakai provided I think should explain a lot of the educational material you’re finding. Phoenix was first published in 2013, built on the more “lower level” websocket support provided by the erlang library cowboy. As essentially “first to the field” for websocket support in Elixir, Phoenix has been the way to do websockets for a decade now. If people needed a more raw websocket handling experience Phoenix simply grew to support more control within its framework.

More recently as more modern and Elixir influenced alternatives to cowboy have been developed there’s also been an effort to create some more “mid level” websocket abstractions, but these are actually the new comers to the field.

Process Registration and PubSub

Switching topics, the main architectural piece that you’re missing here is the idea of a process registry, and the pattern of using this registry to “publish” messages out to clients. At a high level, your websocket process should be using eg Registry — Elixir v1.17.3 to give your process a “name” like a user or client ID. Then from some other part of the system if you want to send that user or client a message, you use the registry to lookup the PID of any processes associated with that user / client ID and then send the message to those processes.

If you are not sure about how to “name” your processes I want to note that this is a problem you’d face no matter what system or language you’re using. If you have 10 websocket clients connected and you want to send one of them a message from the command line or from one client to another, you need SOME way to identify and therefore distinguish one client from another.

4 Likes

It really seems as though you are still conflating modules and processes. Modules are literally just static bags of functions (this isn’t strictly true as they can also carry metadata in the form of persisted attributes, but these are also static). use GenServer doesn’t turn a module into a process, it just adds some callbacks that the GenServer module is aware of. self() returns the pid of whatever process a function is being run in, it has nothing to do with the module itself.

This might make it a bit clearer (if not, sorry I’m trying my best, lol):

# m.ex

defmodule A do
  def hi, do: self()
end

defmodule B do
  def hi, do: self()
end

defmodule Server do
  def start_link, do: spawn_link(&loop/0)

  defp loop do
    receive do
      mod -> apply(mod, :hi, []) |> IO.inspect()
    end

    loop()
  end
end

Here we have a Server that doesn’t hold any state. It receives a module name as a message and assumes it has a function called hi/0 and calls it. A and B both implement hi/0 which just returns self(). Both A.hi() and B.hi() will return the same result when called in the process running the Server.start_link function. Notably, A and B don’t implement any sort of “process behaviour.”

iex(1)> c "m.ex"
[A, B, Server]
iex(2)> pid = Server.start_link()
#PID<0.122.0>
iex(3)> send(pid, A)
#PID<0.122.0>
A
iex(4)> send(pid, B)
#PID<0.122.0>
B

As you can see, all three pids are identical despite being returned from functions in different modules.

EDIT: This is a reply to @mikejm to be clear… it sort looks like I’m responding to @benwilson512 :sweat_smile:

3 Likes

This is great! and to take this illustration further:

iex(6)> pid_x = Server.start_link
#PID<0.130.0>
iex(7)> pid_y = Server.start_link
#PID<0.131.0>
iex(8)> send(pid_x, A)
#PID<0.130.0>
A
iex(9)> send(pid_y, A)
#PID<0.131.0>
A
iex(10)> send(pid_x, B)
#PID<0.130.0>
B
iex(11)> send(pid_y, B)
#PID<0.131.0>
B
iex(12)> self()
#PID<0.114.0>

You can see there’s 3 pids here. You’ve got the shell pid, and then two other pids, each running their own Server loop.

1 Like

I realize too that this is poor wording. It’s really more “a Server that starts a process.” It’s confusing because people do indeed talk about modules as if they themselves are processes, but it’d be more proper to say they implement functions to be run in a process as a server.

It could also be written like this:

defmodule Server do
  def loop do
    receive do
      mod -> apply(mod, :hi, []) |> IO.inspect()
    end

    loop()
  end
end
iex(2)> pid = spawn_link(&Server.loop/0)
#PID<0.122.0>

This better shows that spawn_link isn’t connected to Server in any way. spawn_link, self, etc all come from Kernel, they aren’t “methods” on each module. The compiler essentially rewrites these calls to Kernel.spawn_link etc.

2 Likes

My hero. :smiley: Thank you. That is what I have been missing.

Yes, my plan is to save self() of the init in the WebSock into Syn with username identifier. That way I can look up a given user’s WebSock this way and message them as needed through it. I just couldn’t find a way to actually push a message to them which you explained.

I was just trying to ask the question as simply as possible.

Yes that makes sense. Thank you for the explanation.

So conceptually I am gathering each “process” (PID) is spawned by a supervisor to run a certain loop or function based on whatever the module’s behavior is described by.

So every “Websock Instance” is a unique process running under a supervisor with a PID, looping on the behaviors defined by that module type with a state contained in the loop being passed into itself over and over recursively as its functions are called.

I think that is fair and comprehensible. Thanks.

I think I have enough to do everything I need to at this point, or at least try.

That is another good way of putting it which makes sense along these lines. Thanks as well.

1 Like

Yes! This is exactly right.

1 Like

Okay, easier said than done. Here is the simple test configuration if you are willing to try. I am still unable to send a message to the client. The :push method is giving errors. I am not sure why.

1) CREATE ECHOSERVER AND ROUTER

Copy and paste the code here but make EchoServer:

defmodule EchoServer do
  def init(options) do
    IO.puts("WEBSOCK INITIATED | Self: " <> inspect(self()))
    {:ok, options}
  end

  def handle_in({"ping", [opcode: :text]}, state) do
    IO.puts("PING RECEIVED")
    {:reply, :ok, {:text, "pong"}, state}
  end

  def handle_info(msg, state) do
    IO.puts("HANDLE INFO RECEIVED: " <> inspect(msg))
    {:push, :ok, {:text, "Push This To Client"}, state}
  end

  def terminate(:timeout, state) do
    {:ok, state}
  end
end

2) START WITH COWBOY

Start up code should be also instead Cowboy by my current preference (Bandit has given me more glitches):

  webserver = {Plug.Cowboy, plug: Router, scheme: :http, port: 4000}
  {:ok, _} = Supervisor.start_link([webserver], strategy: :one_for_one)

Run the project.

3) CONNECT WITH WEB BROWSER

Open a web browser like Firefox and F12, copy and paste in Console to run the following:

sock = new WebSocket("ws://localhost:4000/websocket"); 
sock.addEventListener("message", (event) => console.log(event)); 
sock.addEventListener("message", (event) => { console.log("Message from server: ", event.data); }); 
sock.addEventListener("open", () => sock.send("ping")); 

// Send a keep-alive message every 30 seconds
setInterval(() => { if (sock.readyState === WebSocket.OPEN) { sock.send("ping"); }}, 30000); 

4) MESSAGE THE WEBSOCK

Look at Elixir to see the PID of the created process, then type in iex:

sock_pid = pid(0,xxx,0) #whatever is given from "WEBSOCK INITIATED | Self: "
Process.send(sock_pid, "MESSAGE", [])

You will get out HANDLE INFO RECEIVED: proving this code is being hit in Elixir. But I am then getting an error and the connection breaks:

21:51:22.572 [error] Ranch listener My.WebSocketRouter.HTTP had connection process started with :cowboy_clear:start_link/4 at #PID<0.388.0> exit with reason: {{:case_clause, {:crash, :error, :function_clause}}, [{My.UserWebSock, :terminate, 2, [file: ~c"lib/web_socket_user.ex", line: 66]}, {:cowboy_websocket, :handler_call, 6,

So the message is clearly being received but the :push command is still failing. Should this be working? What is remaining failure here? Or why is it not working? Any thoughts?

Any working prototype code or fix to the above to prove this is possible? Thanks again.

1 Like

You’re not returning an acceptable value from handle_info etc.

See the accepted return types:

https://hexdocs.pm/websock/WebSock.html#t:handle_result/0

Ah got it!

Working

  def handle_info(msg, state) do
   {:push,  {:text, "HELLO"}, state}
end

Thanks! And thanks to all in this thread! That’s the end. :joy:
:beers:

1 Like