I have been writing a library DeribitApi on hex.pm. Forked from @gabrielpra1 (on GitHub - will refer to as GP1)'s original deribit v2 HTTP client, I added the WebSockets module under Deribit.API as well as the needed code to keep the persistent socket connection alive, as well as the listener needed to handle the stream of inbound messages. I changed the library namespaces because the :deribit package name was taken (by , so I used :deribit_api. You can find it on hex.pm with the link provided above.
Anyways, long story short. I originally just wrote this WebSocket functionality to quickly display prices a few years ago, so it had hardcoded IO.inspect statements (still does! ), etc.
Recently when I took another look, I remembered that I hadn’t really created the infrastructure needed to use this in projects (still doesn’t - but working on it as you will find over the course of this post.
I added a parameter to the existing implementations of get_public
and get_private
, which are the functions used during setup of the code (this uses metaprogramming, and allows you to dynamically generate all the needed API endpoints without writing them all out explicitly - this implementation existed as part of the initial Deribit v2 HTTP client release by GP1).
This additional parameter takes the form of an anonymous function with a single parameter representing the inbound new message payloads that one would receive over the persistent WebSocket connection, for example:
This, for example, would continuously print out any updates that happen on the BTC PERPETUAL future contract.
The needed setup for the DeribitApi.API.WebSockets
module is basically: you start_link it, which kicks off the listener, then you authenticate. This “listener” that I wrote seems not really idiomatic. I have now noticed, when I try to accumulate all the instruments from each currency, and then perform a call to get_order_book
for each orderbook for each instrument (which actually encompass additional attributes for some instruments vs others, e.g. options which includes greeks, implied volatility, etc)
It would be nice if this could just happen in the background, but I havent explored this yet. I am currently working on adding a Supervisor to the module so that as a persistent connection approaches its expiration TTL, it will use the refresh_token (also stored in the DeribitApi.API.WebSockets module along with the access_token) to refresh the session and continue to do whatever it was doing, seamlessly.
When you call the “special” subscribe
function with a map param that has a key in it called “channels” to subscribe to some set of instruments, what happens is a listener gets kicked off that has been parametrized based on the initial call to subscribe
.
The WebSockets persistence part actually works fine (I will bring this home at the end of the post), and so I haven’t really looked at ways to improve the Listener code, but I noticed yesterday when I was writing some needed code to build up a map including each instrument every 22 minutes (I calculated that I could go through all 1300 instruments every 22 minutes while respecting their API limits, but maybe that limit is just for writes and not reads) that I am getting GenServer call timeouts.
What is strange is that in the first loop through currencies (currencies = ["BTC", "ETH"]
for the purposes of simplicity), this works:
{:ok, deribit} = DeribitApi.API.WebSockets.start_link
DeribitApi.API.WebSockets.authenticate(DeribitApi.client_id, DeribitApi.client_secret)
{:ok, instruments} = Agent.start_link(fn -> [] end)
{:ok, orderbooks} = Agent.start_link(fn -> %{} end)
for currency <- currencies do # currencies == Currency.all
DeribitApi.get_instruments(%{"currency" => currency}, fn(data) ->
Agent.update(instruments, fn state ->
state ++ data["result"]
end)
end)
end
this actually works - if I check the instruments Agent state with Agent.get(instruments, fn x -> x end)
, I see that it has the expected # of instruments, but I get a red message in iex and I want to understand why:
%{"currency" => "ETH"}
** (exit) exited in: GenServer.call(DeribitApi.API.WebSockets, {:get, #Function<3.122630315/1 in DeribitApi.API.WebSockets.get_public/3>}, 5000)
** (EXIT) time out
(elixir 1.14.3) lib/gen_server.ex:1038: GenServer.call/3
iex:19: (file)
iex:18: (file)
Proof that it seems to work:
iex(20)> Agent.get(instruments, fn x -> x end) |> Enum.map(fn(x) -> x["instrument_name"] end)
["BTC-31JAN23-18000-C", "BTC-31JAN23-18000-P", "BTC-31JAN23-19000-C",
"BTC-31JAN23-19000-P", "BTC-31JAN23-20000-C", "BTC-31JAN23-20000-P",
"BTC-31JAN23-20500-C", "BTC-31JAN23-20500-P", "BTC-31JAN23-21000-C",
"BTC-31JAN23-21000-P", "BTC-31JAN23-21500-C", "BTC-31JAN23-21500-P",
"BTC-31JAN23-21750-C", "BTC-31JAN23-21750-P", "BTC-31JAN23-22000-C",
"BTC-31JAN23-22000-P", "BTC-31JAN23-22250-C", "BTC-31JAN23-22250-P",
"BTC-31JAN23-22500-C", "BTC-31JAN23-22500-P", "BTC-31JAN23-22750-C",
"BTC-31JAN23-22750-P", "BTC-31JAN23-23000-C", "BTC-31JAN23-23000-P",
"BTC-31JAN23-23250-C", "BTC-31JAN23-23250-P", "BTC-31JAN23-23500-C",
"BTC-31JAN23-23500-P", "BTC-31JAN23-23750-C", "BTC-31JAN23-23750-P",
"BTC-31JAN23-24000-C", "BTC-31JAN23-24000-P", "BTC-31JAN23-24250-C",
"BTC-31JAN23-24250-P", "BTC-31JAN23-24500-C", "BTC-31JAN23-24500-P",
"BTC-31JAN23-25000-C", "BTC-31JAN23-25000-P", "BTC-31JAN23-25500-C",
"BTC-31JAN23-25500-P", "BTC-31JAN23-26000-C", "BTC-31JAN23-26000-P",
"BTC-31JAN23-27000-C", "BTC-31JAN23-27000-P",
"BTC-CDIAG-29DEC23_31JAN23-40000_25000", "BTC-STRD-31JAN23-23000",
"BTC-CS-31JAN23-23000_23250", "BTC-PS-31JAN23-23250_23000",
"BTC-1FEB23-19000-C", "BTC-1FEB23-19000-P", ...]
iex(21)> Agent.get(instruments, fn x -> x end) |> Enum.map(fn(x) -> x["instrument_name"] end) |> Enum.reverse
["ETH-PERPETUAL", "ETH-29DEC23-5500-P", "ETH-29DEC23-5500-C",
"ETH-29DEC23-5000-P", "ETH-29DEC23-5000-C", "ETH-29DEC23-4500-P",
"ETH-29DEC23-4500-C", "ETH-29DEC23-4000-P", "ETH-29DEC23-4000-C",
"ETH-29DEC23-3500-P", "ETH-29DEC23-3500-C", "ETH-29DEC23-3000-P",
"ETH-29DEC23-3000-C", "ETH-29DEC23-2500-P", "ETH-29DEC23-2500-C",
"ETH-29DEC23-2200-P", "ETH-29DEC23-2200-C", "ETH-29DEC23-2000-P",
"ETH-29DEC23-2000-C", "ETH-29DEC23-1900-P", "ETH-29DEC23-1900-C",
"ETH-29DEC23-1800-P", "ETH-29DEC23-1800-C", "ETH-29DEC23-1700-P",
"ETH-29DEC23-1700-C", "ETH-29DEC23-1600-P", "ETH-29DEC23-1600-C",
"ETH-29DEC23-1500-P", "ETH-29DEC23-1500-C", "ETH-29DEC23-1400-P",
"ETH-29DEC23-1400-C", "ETH-29DEC23-1300-P", "ETH-29DEC23-1300-C",
"ETH-29DEC23-1200-P", "ETH-29DEC23-1200-C", "ETH-29DEC23-1100-P",
"ETH-29DEC23-1100-C", "ETH-29DEC23-1000-P", "ETH-29DEC23-1000-C",
"ETH-29DEC23-800-P", "ETH-29DEC23-800-C", "ETH-29DEC23-600-P",
"ETH-29DEC23-600-C", "ETH-29DEC23-400-P", "ETH-29DEC23-400-C",
"ETH-29DEC23-200-P", "ETH-29DEC23-200-C", "ETH-29DEC23", "ETH-FS-29SEP23_PERP",
"ETH-29SEP23-5500-P", ...]
However, I think something is not working as expected, because when I try to iterate through these instruments and then get the orderbook + other enriched data for each of these (with a Process.sleep(1000) to not keep hitting their API endless during my development), it does not fill up with data and I get the same red error message as the “working” previous set of API calls above…
This is the code and the error that leads to the same error as before, this time actually not working:
Eventually it gets to a point where a session can get so wonky and problem that I just have to kill the process entirely:
iex(19)> idata = fn -> Agent.get(instruments, fn x -> Enum.map(x, fn(z) -> z["instrument_name"] end) end) end
iex(20)> for instrument <- idata.() do
...(20)> IO.puts instrument
...(20)> DeribitApi.get_order_book(%{"instrument_name" => instrument}, fn data ->
...(20)> IO.inspect data
...(20)> Agent.update(orderbooks, fn state ->
...(20)> IO.inspect state
...(20)> Map.put(state, instrument, data["result"])
...(20)> end)
...(20)> end)
...(20)> end
BTC-31JAN23-18000-C
** (exit) exited in: GenServer.call(DeribitApi.API.WebSockets, {:get, #Function<3.122630315/1 in DeribitApi.API.WebSockets.get_public/3>}, 5000)
** (EXIT) time out
(elixir 1.14.3) lib/gen_server.ex:1038: GenServer.call/3
iex:22: (file)
iex:20: (file)
as you can see, it fails with the same error.
also, when I try to check to see whats up with the DeribitApi.API.WebSockets process, it looks like it is already started, but now I cant even authenticate:
iex(37)> DeribitApi.API.WebSockets.start_link
{:error, {:already_started, #PID<0.631.0>}}
iex(38)> DeribitApi.API.WebSockets.authenticate(DeribitApi.client_id, DeribitApi.client_secret)
** (exit) exited in: GenServer.call(DeribitApi.API.WebSockets, {:get, #Function<0.122630315/1 in DeribitApi.API.WebSockets.authenticate/2>}, 5000)
** (EXIT) time out
(elixir 1.14.3) lib/gen_server.ex:1038: GenServer.call/3
(deribit_api 0.4.2) lib/deribit_ws.ex:82: DeribitApi.API.WebSockets.authenticate/2
I didn’t give much thought to just modifying the signatures and then being able to do “whatever I want” in my new magic fn parameter, but I’m wondering if the timeouts have something to do with the sequencing of the various processes and I’m not sure how I can investigate. I noticed that if I use these “classic” Deribit HTTP functions now with the new parameter I added, it kind of breaks my WebSockets API listener.
If the community could please suggest any tips at debugging this, or take a look at some of the relevant code mentioned above and give me some ideas?
The library that I mention with the WebSockets listener is located here: GitHub - arthurcolle/deribit-websockets-api-elixir: Deribit Websockets API client for Elixir and Phoenix
Here are the relevant links:
deribit-websockets-api-elixir/deribit_ws.ex at master · arthurcolle/deribit-websockets-api-elixir · GitHub DeribitApi.API.WebSockets module. We start_link
this to create a process that will create a socket to the relevant endpoint.
When we call DeribitApi.API.WebSockets.authenticate(DeribitApi.client_id, DeribitApi.client_secret), it then kicks off a listener process. Basically, in writing DeribitApi.API.WebSockets, I had to kind of follow the existing code and basically write a module similar to Deribit.API.HTML, so I followed the style. For example, get_public/3
is similar to @gabrielpra1’s get_public/2
but with an additional parameter (the anonymous function, which is mostly just needed for the subscription functionality).
Here is the code for get_public/3
:
@spec get_public(any, any, any) :: any
def get_public(url, params \\ %{}, f) do
Agent.get(__MODULE__, fn data ->
IO.inspect(data)
websocket = Map.get(data, "websocket")
IO.inspect(params)
case Socket.Web.send!(websocket, {:text, M.enc!(base_method(:public, url, params))}) do
:ok ->
case Socket.Web.recv!(websocket) do
{:text, text} ->
parsed_text = M.dec!(text)
f.(parsed_text)
{:ok, spawn(DeribitApi.Listener, :listen, [websocket, data, f])}
{:ping, _} ->
Socket.Web.send!(websocket, {:pong, ""})
other ->
IO.inspect(other)
end
end
end)
end
As you can see, in the WebSockets impl for all the public functions, I get the websocket connection from the module’s state (held in an Agent that stores a map, with the socket connection details), send an encoded request with the url details and the parameters, and if I get an :ok back, the code blocks on the socket connection (the .recv!(socket)
. call above in the case guard. If I get text, I parse it, call my new fn parameter (maybe I just wanna print something to the console and not do anything else, that’s fine, this allows that).
Then, I spawn a Listener and pass in some data and my function, since I want that function to execute every time I get a new message over the WebSocket connection. So that’s basically how the module works.
Finally, here is the listener code. Pretty straightforward:
defmodule DeribitApi.Listener do
require Logger
@spec listen(Socket.Web.t(), any, any) :: any
def listen(websocket, data, f) do
case Socket.Web.recv!(websocket) do
{:text, text} ->
parsed_text = M.dec!(text)
case f do
nil -> nil
_ -> f.(parsed_text)
end
listen(websocket, Map.put(data, Time.utc_now(), parsed_text), f)
{:ping, _} ->
Socket.Web.send!(websocket, {:pong, ""})
_msg ->
Logger.debug("I received a msg.\n#{_msg}")
end
receive do
{:data} -> IO.inspect data
{:time} ->
Time.utc_now()
{:kill, reason} ->
Logger.debug("received kill command.")
Logger.debug("1) closing socket")
Socket.Web.close(websocket)
Logger.debug("2) killing receiver proc")
Process.exit(self, reason)
{:ping, _x} ->
{:pong, Time.utc_now()}
end
end
end
# DeribitApi.API.WebSockets.start_link(); DeribitApi.API.WebSockets.authenticate(DeribitApi.client_id(), DeribitApi.client_secret())
# pid = DeribitApi.subscribe(%{"channels" => ["book.BTC-31JAN23-23000-P.raw"]})
Last thing I want to call out that I think is particular non-idiomatic is my crazy authenticate function:
def authenticate(client_id, client_secret) do
# The response contains 'result' if all is well, and you can use the
# access_token and refresh_token to authenticate subsequent requests.
# we store the entire deserialized 'result' in this module's Agent.
#
# So far, the Agent is keeping track of just 2 things:
# 1) the underlying tcp socket used to maintain the websocket connection
#
# 2) the 'result' payload from the initial response from '/public/auth'
# this includes the following:
# access_token
# refresh_token
# token_type "bearer"
# scope account permissions
# expires_in basically a TTL until access_token expires
#
# Now for every request until we need to re-authenticate (we know how long
# we can use our access_token, from the expires_in TTL.
new_m =
Agent.get(__MODULE__, fn data ->
websocket = Map.get(data, "websocket")
case Socket.Web.send!(
websocket,
{:text, M.enc!(payload_for_auth(client_id, client_secret))}
) do
:ok ->
case Socket.Web.recv!(websocket) do
{:text, text} ->
dec_text = M.dec!(text)
IO.inspect(dec_text)
{:ok, Map.put(data, "result", obtain_result(text))}
{:ping, _} ->
{Socket.Web.send!(websocket, {:pong, ""}), data}
abc ->
IO.puts "This is abc..."
IO.inspect(abc)
end
:error ->
raise "Error!"
x ->
IO.puts "This is x..."
IO.inspect(x)
end
end)
{status, x} = new_m
case status do
:ok -> Agent.update(__MODULE__, fn m -> x end)
_y -> raise "no new map: #{_y}"
end
end
(Bringing the point home from earlier…)
I know that the WebSockets module piece, by itself, is actually very functional. In fact, I can call subscribe on a particular instrument, (or multiple instruments!) receive updates printed to the console, then call unsubscribe
in iex, on one particular symbol (of several), and then only the remaining symbol updates will be printed. Then if I call unsubscribe again on the remaining symbol, all printing will stop. So I think the overall architecture pretty much makes sense: store the websocket connection, kick off a listener process, wait for updates, then do something with the new data. I just think that somehow, my changes have caused some of the initial base functionality to get lost and I wanted to collect feedback on some approaches to analyze this and improve it for my own project goals (keeping this websocket connection open so that I can keep ingesting the inbound data and then iterate through all these instruments and keep a realtime map of this data) as well as having another useful Elixir library in the wild.
Thank you for reading, and I appreciate everyone for taking the time to be a part of this nice little forum. Reminds me of the “before times” of the early WWW