Getting GenServer timeouts due to presumably non-idiomatic Elixir code

I have been writing a library DeribitApi on hex.pm. Forked from @gabrielpra1 (on GitHub - will refer to as GP1)'s original deribit v2 HTTP client, I added the WebSockets module under Deribit.API as well as the needed code to keep the persistent socket connection alive, as well as the listener needed to handle the stream of inbound messages. I changed the library namespaces because the :deribit package name was taken (by , so I used :deribit_api. You can find it on hex.pm with the link provided above.

Anyways, long story short. I originally just wrote this WebSocket functionality to quickly display prices a few years ago, so it had hardcoded IO.inspect statements (still does! :joy:), etc.

Recently when I took another look, I remembered that I hadn’t really created the infrastructure needed to use this in projects (still doesn’t - but working on it as you will find over the course of this post.

I added a parameter to the existing implementations of get_public and get_private, which are the functions used during setup of the code (this uses metaprogramming, and allows you to dynamically generate all the needed API endpoints without writing them all out explicitly - this implementation existed as part of the initial Deribit v2 HTTP client release by GP1).
This additional parameter takes the form of an anonymous function with a single parameter representing the inbound new message payloads that one would receive over the persistent WebSocket connection, for example:

This, for example, would continuously print out any updates that happen on the BTC PERPETUAL future contract.

The needed setup for the DeribitApi.API.WebSockets module is basically: you start_link it, which kicks off the listener, then you authenticate. This “listener” that I wrote seems not really idiomatic. I have now noticed, when I try to accumulate all the instruments from each currency, and then perform a call to get_order_book for each orderbook for each instrument (which actually encompass additional attributes for some instruments vs others, e.g. options which includes greeks, implied volatility, etc)

It would be nice if this could just happen in the background, but I havent explored this yet. I am currently working on adding a Supervisor to the module so that as a persistent connection approaches its expiration TTL, it will use the refresh_token (also stored in the DeribitApi.API.WebSockets module along with the access_token) to refresh the session and continue to do whatever it was doing, seamlessly.

When you call the “special” subscribe function with a map param that has a key in it called “channels” to subscribe to some set of instruments, what happens is a listener gets kicked off that has been parametrized based on the initial call to subscribe.

The WebSockets persistence part actually works fine (I will bring this home at the end of the post), and so I haven’t really looked at ways to improve the Listener code, but I noticed yesterday when I was writing some needed code to build up a map including each instrument every 22 minutes (I calculated that I could go through all 1300 instruments every 22 minutes while respecting their API limits, but maybe that limit is just for writes and not reads) that I am getting GenServer call timeouts.

What is strange is that in the first loop through currencies (currencies = ["BTC", "ETH"] for the purposes of simplicity), this works:


{:ok, deribit} = DeribitApi.API.WebSockets.start_link
DeribitApi.API.WebSockets.authenticate(DeribitApi.client_id, DeribitApi.client_secret)

{:ok, instruments} = Agent.start_link(fn -> [] end)
{:ok, orderbooks} = Agent.start_link(fn -> %{} end)

for currency <- currencies do # currencies == Currency.all 
  DeribitApi.get_instruments(%{"currency" => currency}, fn(data) ->
    Agent.update(instruments, fn state ->
      state ++ data["result"]
    end)
  end)
end

this actually works - if I check the instruments Agent state with Agent.get(instruments, fn x -> x end), I see that it has the expected # of instruments, but I get a red message in iex and I want to understand why:

%{"currency" => "ETH"}
** (exit) exited in: GenServer.call(DeribitApi.API.WebSockets, {:get, #Function<3.122630315/1 in DeribitApi.API.WebSockets.get_public/3>}, 5000)
    ** (EXIT) time out
    (elixir 1.14.3) lib/gen_server.ex:1038: GenServer.call/3
    iex:19: (file)
    iex:18: (file)

Proof that it seems to work:

iex(20)> Agent.get(instruments, fn x -> x end) |> Enum.map(fn(x) -> x["instrument_name"] end)
["BTC-31JAN23-18000-C", "BTC-31JAN23-18000-P", "BTC-31JAN23-19000-C",
 "BTC-31JAN23-19000-P", "BTC-31JAN23-20000-C", "BTC-31JAN23-20000-P",
 "BTC-31JAN23-20500-C", "BTC-31JAN23-20500-P", "BTC-31JAN23-21000-C",
 "BTC-31JAN23-21000-P", "BTC-31JAN23-21500-C", "BTC-31JAN23-21500-P",
 "BTC-31JAN23-21750-C", "BTC-31JAN23-21750-P", "BTC-31JAN23-22000-C",
 "BTC-31JAN23-22000-P", "BTC-31JAN23-22250-C", "BTC-31JAN23-22250-P",
 "BTC-31JAN23-22500-C", "BTC-31JAN23-22500-P", "BTC-31JAN23-22750-C",
 "BTC-31JAN23-22750-P", "BTC-31JAN23-23000-C", "BTC-31JAN23-23000-P",
 "BTC-31JAN23-23250-C", "BTC-31JAN23-23250-P", "BTC-31JAN23-23500-C",
 "BTC-31JAN23-23500-P", "BTC-31JAN23-23750-C", "BTC-31JAN23-23750-P",
 "BTC-31JAN23-24000-C", "BTC-31JAN23-24000-P", "BTC-31JAN23-24250-C",
 "BTC-31JAN23-24250-P", "BTC-31JAN23-24500-C", "BTC-31JAN23-24500-P",
 "BTC-31JAN23-25000-C", "BTC-31JAN23-25000-P", "BTC-31JAN23-25500-C",
 "BTC-31JAN23-25500-P", "BTC-31JAN23-26000-C", "BTC-31JAN23-26000-P",
 "BTC-31JAN23-27000-C", "BTC-31JAN23-27000-P",
 "BTC-CDIAG-29DEC23_31JAN23-40000_25000", "BTC-STRD-31JAN23-23000",
 "BTC-CS-31JAN23-23000_23250", "BTC-PS-31JAN23-23250_23000",
 "BTC-1FEB23-19000-C", "BTC-1FEB23-19000-P", ...]

iex(21)> Agent.get(instruments, fn x -> x end) |> Enum.map(fn(x) -> x["instrument_name"] end) |> Enum.reverse
["ETH-PERPETUAL", "ETH-29DEC23-5500-P", "ETH-29DEC23-5500-C",
 "ETH-29DEC23-5000-P", "ETH-29DEC23-5000-C", "ETH-29DEC23-4500-P",
 "ETH-29DEC23-4500-C", "ETH-29DEC23-4000-P", "ETH-29DEC23-4000-C",
 "ETH-29DEC23-3500-P", "ETH-29DEC23-3500-C", "ETH-29DEC23-3000-P",
 "ETH-29DEC23-3000-C", "ETH-29DEC23-2500-P", "ETH-29DEC23-2500-C",
 "ETH-29DEC23-2200-P", "ETH-29DEC23-2200-C", "ETH-29DEC23-2000-P",
 "ETH-29DEC23-2000-C", "ETH-29DEC23-1900-P", "ETH-29DEC23-1900-C",
 "ETH-29DEC23-1800-P", "ETH-29DEC23-1800-C", "ETH-29DEC23-1700-P",
 "ETH-29DEC23-1700-C", "ETH-29DEC23-1600-P", "ETH-29DEC23-1600-C",
 "ETH-29DEC23-1500-P", "ETH-29DEC23-1500-C", "ETH-29DEC23-1400-P",
 "ETH-29DEC23-1400-C", "ETH-29DEC23-1300-P", "ETH-29DEC23-1300-C",
 "ETH-29DEC23-1200-P", "ETH-29DEC23-1200-C", "ETH-29DEC23-1100-P",
 "ETH-29DEC23-1100-C", "ETH-29DEC23-1000-P", "ETH-29DEC23-1000-C",
 "ETH-29DEC23-800-P", "ETH-29DEC23-800-C", "ETH-29DEC23-600-P",
 "ETH-29DEC23-600-C", "ETH-29DEC23-400-P", "ETH-29DEC23-400-C",
 "ETH-29DEC23-200-P", "ETH-29DEC23-200-C", "ETH-29DEC23", "ETH-FS-29SEP23_PERP",
 "ETH-29SEP23-5500-P", ...]

However, I think something is not working as expected, because when I try to iterate through these instruments and then get the orderbook + other enriched data for each of these (with a Process.sleep(1000) to not keep hitting their API endless during my development), it does not fill up with data and I get the same red error message as the “working” previous set of API calls above…

This is the code and the error that leads to the same error as before, this time actually not working:
Eventually it gets to a point where a session can get so wonky and problem that I just have to kill the process entirely:

iex(19)> idata = fn -> Agent.get(instruments, fn x -> Enum.map(x, fn(z) -> z["instrument_name"] end) end) end
iex(20)> for instrument <- idata.() do
...(20)>   IO.puts instrument
...(20)>   DeribitApi.get_order_book(%{"instrument_name" => instrument}, fn data ->
...(20)>     IO.inspect data
...(20)>     Agent.update(orderbooks, fn state ->
...(20)>       IO.inspect state
...(20)>       Map.put(state, instrument, data["result"])
...(20)>     end)
...(20)>   end)
...(20)> end
BTC-31JAN23-18000-C
** (exit) exited in: GenServer.call(DeribitApi.API.WebSockets, {:get, #Function<3.122630315/1 in DeribitApi.API.WebSockets.get_public/3>}, 5000)
    ** (EXIT) time out
    (elixir 1.14.3) lib/gen_server.ex:1038: GenServer.call/3
    iex:22: (file)
    iex:20: (file)

as you can see, it fails with the same error.
also, when I try to check to see whats up with the DeribitApi.API.WebSockets process, it looks like it is already started, but now I cant even authenticate:

iex(37)> DeribitApi.API.WebSockets.start_link
{:error, {:already_started, #PID<0.631.0>}}

iex(38)> DeribitApi.API.WebSockets.authenticate(DeribitApi.client_id, DeribitApi.client_secret)
** (exit) exited in: GenServer.call(DeribitApi.API.WebSockets, {:get, #Function<0.122630315/1 in DeribitApi.API.WebSockets.authenticate/2>}, 5000)
    ** (EXIT) time out
    (elixir 1.14.3) lib/gen_server.ex:1038: GenServer.call/3
    (deribit_api 0.4.2) lib/deribit_ws.ex:82: DeribitApi.API.WebSockets.authenticate/2

I didn’t give much thought to just modifying the signatures and then being able to do “whatever I want” in my new magic fn parameter, but I’m wondering if the timeouts have something to do with the sequencing of the various processes and I’m not sure how I can investigate. I noticed that if I use these “classic” Deribit HTTP functions now with the new parameter I added, it kind of breaks my WebSockets API listener.

If the community could please suggest any tips at debugging this, or take a look at some of the relevant code mentioned above and give me some ideas?

The library that I mention with the WebSockets listener is located here: GitHub - arthurcolle/deribit-websockets-api-elixir: Deribit Websockets API client for Elixir and Phoenix

Here are the relevant links:

deribit-websockets-api-elixir/deribit_ws.ex at master · arthurcolle/deribit-websockets-api-elixir · GitHub DeribitApi.API.WebSockets module. We start_link this to create a process that will create a socket to the relevant endpoint.

When we call DeribitApi.API.WebSockets.authenticate(DeribitApi.client_id, DeribitApi.client_secret), it then kicks off a listener process. Basically, in writing DeribitApi.API.WebSockets, I had to kind of follow the existing code and basically write a module similar to Deribit.API.HTML, so I followed the style. For example, get_public/3 is similar to @gabrielpra1’s get_public/2 but with an additional parameter (the anonymous function, which is mostly just needed for the subscription functionality).

Here is the code for get_public/3:


  @spec get_public(any, any, any) :: any
  def get_public(url, params \\ %{}, f) do
    Agent.get(__MODULE__, fn data ->
      IO.inspect(data)
      websocket = Map.get(data, "websocket")
      IO.inspect(params)

      case Socket.Web.send!(websocket, {:text, M.enc!(base_method(:public, url, params))}) do
        :ok ->
          case Socket.Web.recv!(websocket) do
            {:text, text} ->
              parsed_text = M.dec!(text)
              f.(parsed_text)
              {:ok, spawn(DeribitApi.Listener, :listen, [websocket, data, f])}
            {:ping, _} ->
              Socket.Web.send!(websocket, {:pong, ""})
            other ->
              IO.inspect(other)
          end
      end
    end)
  end

As you can see, in the WebSockets impl for all the public functions, I get the websocket connection from the module’s state (held in an Agent that stores a map, with the socket connection details), send an encoded request with the url details and the parameters, and if I get an :ok back, the code blocks on the socket connection (the .recv!(socket). call above in the case guard. If I get text, I parse it, call my new fn parameter (maybe I just wanna print something to the console and not do anything else, that’s fine, this allows that).
Then, I spawn a Listener and pass in some data and my function, since I want that function to execute every time I get a new message over the WebSocket connection. So that’s basically how the module works.

Finally, here is the listener code. Pretty straightforward:

defmodule DeribitApi.Listener do
  require Logger

  @spec listen(Socket.Web.t(), any, any) :: any
  def listen(websocket, data, f) do

    case Socket.Web.recv!(websocket) do
      {:text, text} ->
        parsed_text = M.dec!(text)
        case f do
          nil -> nil
          _ -> f.(parsed_text)
        end
        listen(websocket, Map.put(data, Time.utc_now(), parsed_text), f)
      {:ping, _} ->
        Socket.Web.send!(websocket, {:pong, ""})

      _msg ->
        Logger.debug("I received a msg.\n#{_msg}")
    end

    receive do
      {:data} -> IO.inspect data
      {:time} ->
        Time.utc_now()

      {:kill, reason} ->
        Logger.debug("received kill command.")
        Logger.debug("1) closing socket")
        Socket.Web.close(websocket)
        Logger.debug("2) killing receiver proc")
        Process.exit(self, reason)

      {:ping, _x} ->
        {:pong, Time.utc_now()}
    end
  end
end

# DeribitApi.API.WebSockets.start_link(); DeribitApi.API.WebSockets.authenticate(DeribitApi.client_id(), DeribitApi.client_secret())
# pid = DeribitApi.subscribe(%{"channels" => ["book.BTC-31JAN23-23000-P.raw"]})

Last thing I want to call out that I think is particular non-idiomatic is my crazy authenticate function:

  def authenticate(client_id, client_secret) do
    # The response contains 'result' if all is well, and you can use the
    # access_token and refresh_token to authenticate subsequent requests.
    # we store the entire deserialized 'result' in this module's Agent.
    #
    # So far, the Agent is keeping track of just 2 things:
    #     1) the underlying tcp socket used to maintain the websocket connection
    #
    #     2) the 'result' payload from the initial response from '/public/auth'
    #        this includes the following:
    #          access_token
    #          refresh_token
    #          token_type          "bearer"
    #          scope               account permissions
    #          expires_in          basically a TTL until access_token expires
    #
    # Now for every request until we need to re-authenticate (we know how long
    # we can use our access_token, from the expires_in TTL.

    new_m =
      Agent.get(__MODULE__, fn data ->
        websocket = Map.get(data, "websocket")

        case Socket.Web.send!(
               websocket,
               {:text, M.enc!(payload_for_auth(client_id, client_secret))}
             ) do
          :ok ->
            case Socket.Web.recv!(websocket) do
              {:text, text} ->
                dec_text = M.dec!(text)
                IO.inspect(dec_text)
                {:ok, Map.put(data, "result", obtain_result(text))}
              {:ping, _} ->
                {Socket.Web.send!(websocket, {:pong, ""}), data}
              abc ->
                IO.puts "This is abc..."
                IO.inspect(abc)
            end

          :error ->
            raise "Error!"

          x ->
            IO.puts "This is x..."
            IO.inspect(x)
        end
      end)

    {status, x} = new_m

    case status do
      :ok -> Agent.update(__MODULE__, fn m -> x end)
      _y -> raise "no new map: #{_y}"
    end
  end

(Bringing the point home from earlier…)

I know that the WebSockets module piece, by itself, is actually very functional. In fact, I can call subscribe on a particular instrument, (or multiple instruments!) receive updates printed to the console, then call unsubscribe in iex, on one particular symbol (of several), and then only the remaining symbol updates will be printed. Then if I call unsubscribe again on the remaining symbol, all printing will stop. So I think the overall architecture pretty much makes sense: store the websocket connection, kick off a listener process, wait for updates, then do something with the new data. I just think that somehow, my changes have caused some of the initial base functionality to get lost and I wanted to collect feedback on some approaches to analyze this and improve it for my own project goals (keeping this websocket connection open so that I can keep ingesting the inbound data and then iterate through all these instruments and keep a realtime map of this data) as well as having another useful Elixir library in the wild.

Thank you for reading, and I appreciate everyone for taking the time to be a part of this nice little forum. Reminds me of the “before times” of the early WWW :face_with_hand_over_mouth:

1 Like

A note about naming: having a package define a module named just M is generally discouraged. If you want a shorthand helper module, use a more-specific name and alias that to M where needed.

The machinery around DeribitApi.Listener has some features that seem odd to me:

  • a {:ping, _} message from the server before the first packet of expected data in get_public will make get_public return :ok
  • a {:ping, _} message from the server after the first packet expected data will cause the Listener to never call Socket.Web.recv again
  • spawn is not commonly used by itself like this, as the resulting process doesn’t fit into the OTP ecosystem for dealing with crashes. For instance, if the process that started a DeribitApi.Listener exits the listener should likely also exit.
  • the second call to get_public could result in two processes doing a Socket.Web.recv! on the same websocket. I’m not certain what happens with two blocking receives on the same port, but I strongly doubt it’s a good thing…
    • the Listener started by the first call
    • the caller of get_public
  • subscription messages could appear anywhere in the stream, so a call to get_public might accidentally recv one!

I recommend an approach closer to what websockex provides:

  • initiate the connection in start_link
  • the listener tracks the current access_token etc
  • making a one-off request like get_instruments uses send to dispatch to the listener’s handle_info:
    • generates a request ID
    • sends the request to the API with that ID
    • records the caller’s PID etc and the request ID for when the reply arrives
  • calls like subscribe also dispatch to the listener’s handle_info:
    • sends the subscribe request to the API
    • records the caller’s PID and the requested channels
    • (optional) record whatever’s needed to reply to the caller of subscribe, if anything
  • the listener’s handle_frame deals with all replies coming from the server:
    • one-off functions will have an ID; look up which process is expecting a reply and send it along
    • notifications are routed to subscribers, either via send or PubSub
2 Likes