Phoenix.Socket.Transport error handling - sending error reason to the client

Hi,

I’m trying to implement a raw websocket endpoint to push data to some clients which may use any language (JavaScript, Java…). For that purpose I used Phoenix.Socket.Transport behaviour :

defmodule MyApp.WebsocketEndpoint do
  @behaviour Phoenix.Socket.Transport

  require Logger

  def child_spec(_opts) do
    # We won't spawn any process, so let's return a dummy task
    %{id: __MODULE__, start: {Task, :start_link, [fn -> :ok end]}, restart: :transient}
  end
  
  def connect(%{params: params} = _state) do
    Logger.info("Websocket connection established")
    {:ok, params}
  end

  # Now we are effectively inside the process that maintains the socket.
  def init(%{"queryName"=> queryName} = params) do

    Logger.info("Websocket init with query_name : #{queryName} and params : #{inspect(params)}")

    try do
        validate_param_names(params, ControlsException)
        
        state =
          with {:ok, params} <- Tarams.cast(params, params_schema) do

            {:ok, query} = Queries.create_query(params, queryName)
            Queries.schedule_job(query, {:websocket, self()})
            query
          else
            {:error, errors} ->
              send(self(), {:error, errors.message})
          end
        {:ok, state}
     rescue
      e in ControlsException ->
        send(self(), {:error, e.message})
    end

  end

  def handle_info({:send_events, message}, state) do
    message = Jason.encode!(message)
   {:push, {:text, message}, state}
  end

  def handle_info({:error, reason}, state) do
    {:stop, reason, state}
  end

  def handle_in({text, _opts}, state) do
    {:reply, :ok, {:text, text}, state}
  end

  def terminate(reason, state) do
    Logger.warn("Websocket connection terminated with reason : #{inspect(reason)}")
    :ok
  end
end

On the endpoint side :

socket "/ws/query/:queryName", MyApp.WebsocketEndpoint, websocket: [path: "", timeout: :infinity]

This work well with a test client (JavaScript) and I’m able to push messages correctly :

var WebSocketClient = require('websocket').client;

var client = new WebSocketClient();

client.on('connectFailed', function(error) {
    console.log('Connect Error: ' + error.toString());
});

client.on('connect', function(connection) {
    console.log('WebSocket Client Connected');

    connection.on('error', function(error) {
        console.log("Connection Error: " + error.toString());
    });

    connection.on('close', function() {
        console.log('echo-protocol Connection Closed');
    });

    connection.on('message', function(message) {
        if (message.type === 'utf8') {
            console.log(JSON.stringify(JSON.parse(message.utf8Data), null, 2))
        }
    });

});

client.connect('ws://localhost:5000/ws/query/:MyQuery?send=true&&initialTime=2022-01-19T08%3A15%3A30.285639Z');

However, when errors occur (like on validation errors), the connection is immediatly closed and the terminate callback is called. The desired behavior would be to send the error reason to the client ( to console.log('Connect Error: ’ + error.toString())) and eventually close the connection after. I tried to send to self() an error message and implement an approriate handle_info function with the ‘:stop’ reply but it doesn’t work. Otherwise, I was wondering whether the timeout set to ‘:infinity’ is a good practice or not because without it I have timeouts with closed connection. Thanks for your help.

1 Like

Hi @didus,

I’m not sure but it looks like you don’t return state when you handle errors

def init(%{"queryName" => queryName} = params) do
  Logger.info("Websocket init with query_name : #{queryName} and params : #{inspect(params)}")

  try do
    validate_param_names(params, ControlsException)

    state =
      with {:ok, params} <- Tarams.cast(params, params_schema),
           {:ok, query} = Queries.create_query(params, queryName) do
        Queries.schedule_job(query, {:websocket, self()})
        query
      else
        {:error, errors} ->
          send(self(), {:error, errors.message})
          # here you need to return state
      end

    {:ok, state}
  rescue
    e in ControlsException ->
      send(self(), {:error, e.message})
      # here you need to return {:ok, state}
  end
end
1 Like

Hi @stefan_z

Thanks for your reply. I have changed the implementation by moving the logic to a handle_info function. The only way I found so far is by pushing the error reason and relying on the client to close the connection. It would be better if the server could automatically pass the error to the client before terminating the connection but I don’t know if that is possible. Here is the code :

def init(%{"queryName"=> queryName} = params) do
    Logger.info("Websocket init with query_name : #{queryName} and params : #{inspect(params)}")
    send(self(), :process_request)
end

def handle_info(:process_request, %{params: params} = state) do
    try do
        validate_param_names(params, ControlsException)
        
            with {:ok, params} <- Tarams.cast(params, params_schema) do

            {:ok, query} = Queries.create_query(params, queryName)
            Queries.schedule_job(query, {:websocket, self()})
            query
          else
            {:error, errors} ->
              {:push, {:text, "error : #{error}"}, state}
          end
        {:ok, state}
     rescue
      e in ControlsException ->
        {:push, {:text, "error : #{error}"}, state}
    end
 end
    

On the JavaScript side :

socket.onmessage = function(event) {
    
    if(event.data.startsWith("error :")){
        console.log(event.data)
        socket.close();
    }
    else
        console.log(JSON.stringify(JSON.parse(event.data), null, 2))
};

Hi @didus,

What do you think about this one. Let client to connect and let socket state to initialise. Once the client is connected, send a message from it to the server. On the server side handle_in/2 will be called and there you can validate that message like you do in init/1. Based on validation result you will do proper reply.

Hi @stefan_z

That would be a good solution. However and unfortunatly I should conform to a specific API specification which states that the client intiate the websocket connection by providing params ( a cron-like params to schedule a backend job that will periodically push data to clients) that must be validated on connection. Thanks a lot for your help.

Hi, no problem at all :slight_smile:

But in your last code example there are few mistakes. First one is that your init/1 function still missing return {:ok, state}. Second one is that your handle_info/2 must return one of:

{:ok, state} - continues the socket with no reply
{:push, reply, state} - continues the socket with reply
{:stop, reason, state} - stops the socket

but you are doing both {:push, reply, state} and {:ok, state} in the same time.

So correction of code whit some comments is:

def init(%{"queryName"=> queryName} = params) do
    Logger.info("Websocket init with query_name : #{queryName} and params : #{inspect(params)}")
    send(self(), :process_request)
    # You were missing this return, I've added it
    {:ok, params} 
end

def handle_info(:process_request, %{params: params} = state) do
    try do
      validate_param_names(params, ControlsException)

      with {:ok, params} <- Tarams.cast(params, params_schema) do
        {:ok, query} = Queries.create_query(params, queryName)
        Queries.schedule_job(query, {:websocket, self()})
        # What is query variable here, should it be assigned in state or just skipped?
        # query
        {:ok, state}
      else
        # Here is also one mistake. You were interpolating value that is not defined yet. Instead of `errors` you are interpolating `error`.
        # {:error, errors} ->
          # {:push, {:text, "error : #{inspect error}"}, state}
        # You need to be careful when you are doing interpolation.
        # Doc says: "In case the value you want to interpolate cannot be converted to a string, because it doesn't have an human textual representation, a protocol error will be raised."
        # So if your error is in string format than its fine to do like you did, but if its not you need to inspect that value
        # {:push, {:text, "error : #{inspect error}"}, state}
        # Please check in which format is `error` and based on format you will use or not `inspect/1`
        {:error, error} ->
          {:push, {:text, "error : #{error}"}, state}
      end
      # Since you already returned `{:ok, state}` in `do` body and you also return `{:push, {:text, "error : #{error}"}, state}` in `else` body
      # this is unnecessary so I will comment it
      # {:ok, state}
    rescue
      # Here is the same mistake like I've explained above related to interpolation.
      error in ControlsException ->
        {:push, {:text, "error : #{error}"}, state}
    end
  end

Maybe there are errors in terminal that could say something more about this problem…

Hi @stefan_z

Actually I was trying to simplify the original code to be more readable but I made many typos and I’m sorry about that ! here is the original code :

def init(%{params: %{"queryName"=> queryName}} = state) do
  Logger.info("Websocket init with query_name : #{queryName}")
  send(self(), :process_request)
  {:ok, state}
end

def handle_info(:process_request, %{params: params} = state) do
  try do
    validate_param_names(params,subscription_param_names(), ControlsException)
    scheduler = build_scheduler_structure(params)

      with {:ok, subscription_params} <- Tarams.cast(scheduler, %ControllerHelpers{}.subscription_params_schema) do

        {:ok, subscription} = Subsrciptions.create_subsrciption(subscription_params, params["queryName"])
        JobScheduler.schedule_job(subscription, {:websocket, self()})
        {:ok, subscription}
      else
        {:error, error} ->
          # The error is in the format : %{param: ["is invalid"]}
          Logger.error("Error occured in WebsocketEndpoint. Reason: #{inspect(error)}")
          {:push, {:text, "error : #{Poison.encode!(error)}"}, state}
      end
  rescue
    e ->
      #e.message = The parameter 'param' is not a valid key
      Logger.error("Error occured in WebsocketEndpoint. Reason: #{e.message}")
      {:push, {:text, "error : #{e.message}"}, state}
  end
end

This works well provided the client closes the connection after receiving the error reason. But It would be ideal if the server could do it after the reason was sent but I don’t find how to do it. Thanks

Hey that’s a good news! :slight_smile: If client calls socket.close I think connection will be closed and terminate/2 callback will be called with reason :closed :thinking: . If you want to close connection on server side after pushing a message on socket, you can schedule closing with Process.send_after/3 and in handle_info/2 you can return {:stop, reason, state} I guess :slight_smile:

1 Like

Wonderfull ! many thanks.

:tada: :tada: :tada: You’re welcome! :+1: