ETS for TCP connections

Hello.

I saw on many posts and implementations for TCP connection management managed using GenServer or GenStatem, which is great with all the message passing, reactive, etc…
However, it can have strong risk of bottlenecks and overloading of the mailbox;

I ran some benchmarks, and tested an implementation of a TCP client using ETS to manage the socket.
Even if it’s not use message passing and synchronous reading of the socket, it still way faster, and even more when we add load or parallelization. So I don’t get timeouts easily.
It seems also a win/win as there is no need or less for process pooling, which tends to reduce the overhead.

What do you think about managing TCP connection by ETS ?

Thanks

Hello

I’m not sure to understand what/how/why you use ETS for TCP connection but have take a look on Registry ? Pretty sure it could help you a lot :wink:

I don’t know how you manage the TCP state in ETS (what happen if a TCP tunel is closed etc). In registry, you’ll follow the erlang mind of wrap everything in a process to let it crash.

Edit : I’m interesting to see some code if you can ^^

1 Like

The reason is the overload of the mailbox of the GenServer. I also tried GenStage but not better than GenServer.

For connection closed and family , I ended up checking on each message the tuple return , to determine if I need to reconnect or not. It’s more like as demand.

A better version could be to use a set of
socket (like a pool but without server) and switch one of them if the socket is closed, while reconnecting in background .

It’s about connections from your server or to your server ?

If it’s from your server, GenServer is not a good solution (and registry will not help you). GenStage is not a big game changer (it will only help you to manage data flow).

But you can spawn process and monitor them from a GenServer. So the GenServer will be aware if a process is down or w.e (take a look at spawn_link, spawn, monitor, process etc)

I suppose you create x connections through x endpoints. 1 connection per endpoint should be enough. Maybe a lib like gun could also be a good resource

It is to my server, as a client. In fact I used GenServer for the connection acceptor on the server side.

I did use GenServer for client with active: :once and noreoply while keeping the requesters during but as some point there are some bottleneck or mailbox overload (issue with unbounded queue).

So I did try multiple solutions semaphore, ets …

Have you looked into how Ecto manages database connection pools? DBConnection — db_connection v2.4.1

Each connection is a process, but once you checkout a connection, you can use the socket directly, without going through the process that owns it. I’d be curious if this library work for your use case or if you have different constraints.

2 Likes

Indeed it uses ETS to store the socket and pass it to the client during a given time internal.

How about using N gen servers, and shard connections between them?
To find the processes pid, use a Register

Have you tried using {:active,:once} to limit when incoming messages are sent to your TCP process?

Yes I did try with both client and server , but the issue I’m facing it is not from the TCP stack but from the clients wanting to send message and wait response from this socket. (Typically from the GenServer call and no GenServer info callback). For example to have hundred of parallel requests in the same time.

I tried this approach as well, but no better than ETS as socket holder.

…clients wanting to send message and wait response from this socket.
…For example to have hundred of parallel requests in the same time.

ranch + gen_statem?

Could you elaborate please ?

Here some code example I ran with livebook:

For the server side:

defmodule Listener do
  use GenServer

  def start_link(arg) do
    GenServer.start_link(__MODULE__, arg)
  end

  def init(arg) do
    port = Keyword.get(arg, :port)

    {:ok, listen_socket} =
      :gen_tcp.listen(port, [
        :binary,
        {:packet, 4},
        {:active, :once},
        {:reuseaddr, true},
        {:backlog, 500}
      ])

    Enum.each(1..100, fn _ ->
      spawn(fn -> accept_loop(listen_socket) end)
    end)

    {:ok, %{listen_socket: listen_socket}}
  end

  def accept_loop(listen_socket) do
    case :gen_tcp.accept(listen_socket) do
      {:ok, socket} ->
        {:ok, pid} = AcceptedConnection.start_link(socket: socket)
        :gen_tcp.controlling_process(socket, pid)
        accept_loop(listen_socket)

      {:error, _} = e ->
        e
    end
  end
end

defmodule AcceptedConnection do
  use GenServer

  def start_link(arg) do
    GenServer.start_link(__MODULE__, arg)
  end

  def init(arg) do
    socket = Keyword.get(arg, :socket)
    {:ok, %{socket: socket}}
  end

  def handle_info({:tcp, socket, data}, state) do
    :inet.setopts(socket, active: :once)
    # Reply back the message
    :gen_tcp.send(socket, data)
    {:noreply, state}
  end

  def handle_info({:tcp_closed, _}, state) do
    {:stop, :normal, state}
  end
end

For the client:

defmodule Connection do
  use GenServer

  def start_link(arg) do
    GenServer.start_link(__MODULE__, arg)
  end

  def send_message(pid, msg) do
    GenServer.call(pid, {:send_message, msg})
  end

  def init(arg) do
    ip = Keyword.get(arg, :ip)
    port = Keyword.get(arg, :port)
    Registry.register(ConnectionRegistry, :connections, _value = nil)

    {:ok, socket} = :gen_tcp.connect(ip, port, [:binary, {:packet, 4}, {:active, :once}])

    {:ok, %{socket: socket, requests: %{}, next_id: 1}}
  end

  def handle_call({:send_message, data}, from, state = %{socket: socket, next_id: next_id}) do
    :gen_tcp.send(socket, <<next_id::32, data::binary>>)

    new_state =
      state
      |> Map.update!(:next_id, &(&1 + 1))
      |> Map.update!(:requests, &Map.put(&1, next_id, from))

    {:noreply, new_state}
  end

  def handle_info(
        {:tcp, _, <<id::32, data::binary>>},
        state = %{socket: socket, requests: requests}
      ) do
    :inet.setopts(socket, active: :once)
    {from, requests} = Map.pop(requests, id)
    GenServer.reply(from, {:ok, data})
    {:noreply, %{state | requests: requests}}
  end

  def handle_info({:tcp_closed, _}, state) do
    Registry.unregister(ConnectionRegistry, :connections)
    {:stop, :normal, state}
  end
end

Benchmark suite:

port = 10_000

{:ok, pid} = Listener.start_link(port: port)
%{listen_socket: listen_socket} = :sys.get_state(pid)

Registry.start_link(name: ConnectionRegistry, keys: :duplicate)

{:ok, conn1} = Connection.start_link(ip: {127, 0, 0, 1}, port: port)
{:ok, conn2} = Connection.start_link(ip: {127, 0, 0, 1}, port: port)
{:ok, conn2} = Connection.start_link(ip: {127, 0, 0, 1}, port: port)

{:ok, socket1} = :gen_tcp.connect({127, 0, 0, 1}, port, [:binary, {:packet, 4}, {:active, false}])
{:ok, socket2} = :gen_tcp.connect({127, 0, 0, 1}, port, [:binary, {:packet, 4}, {:active, false}])
{:ok, socket3} = :gen_tcp.connect({127, 0, 0, 1}, port, [:binary, {:packet, 4}, {:active, false}])

if :ets.info(:connections) == :undefined do
  :ets.new(:connections, [:named_table, :public, read_concurrency: true])
end

:ets.insert(:connections, {{{127, 0, 0, 1}, port}, socket1})

Benchee.run(
  %{
    "single genserver" => fn ->
      Connection.send_message(conn1, "hello")
    end,
    "pool of genserver" => fn ->
      connections = Registry.lookup(ConnectionRegistry, :connections)
      {pid, _} = Enum.random(connections)
      Connection.send_message(pid, "hello")
    end,
    "ets" => fn ->
      {_, socket_ets} = :ets.lookup(:connections, {{127, 0, 0, 1}, port}) |> Enum.random()
      :gen_tcp.send(socket_ets, "hello")
      :gen_tcp.recv(socket_ets, 0)
    end
  },
  parallel: 4
)

:ets.delete(:connections)
:gen_tcp.close(listen_socket)

Benchmarks results

Operating System: macOS
CPU Information: Intel(R) Core(TM) i5-7500 CPU @ 3.40GHz
Number of Available Cores: 4
Available memory: 16 GB
Elixir 1.12.3
Erlang 24.1.2

Benchmark suite executing with the following configuration:
warmup: 2 s
time: 5 s
memory time: 0 ns
parallel: 4
inputs: none specified
Estimated total run time: 21 s

Benchmarking ets...
Benchmarking pool of genserver...
Benchmarking single genserver...

Name                        ips        average  deviation         median         99th %
ets                     22.47 K       44.50 μs   ±311.73%          29 μs         211 μs
single genserver         9.67 K      103.43 μs    ±58.76%          93 μs         268 μs
pool of genserver        8.43 K      118.64 μs   ±130.49%          98 μs         323 μs

Comparison: 
ets                     22.47 K
single genserver         9.67 K - 2.32x slower +58.93 μs
pool of genserver        8.43 K - 2.67x slower +74.14 μs

However, if I’m increasing the parallel execution number, the latency between ETS and pool of GenServer is reducing, but it requires few connections instead of a single one.

Also if I increase the latency for example:

def handle_info({:tcp, socket, data}, state) do
    :inet.setopts(socket, active: :once)
    spawn(fn ->
       # Reply back the message
       Process.sleep(200)
      :gen_tcp.send(socket, data)
    end)
   
    {:noreply, state}
  end

The ets solution shines and the pool vs single genserver remains the same:

Operating System: macOS
CPU Information: Intel(R) Core(TM) i5-7500 CPU @ 3.40GHz
Number of Available Cores: 4
Available memory: 16 GB
Elixir 1.12.3
Erlang 24.1.2

Benchmark suite executing with the following configuration:
warmup: 2 s
time: 5 s
memory time: 0 ns
parallel: 100
inputs: none specified
Estimated total run time: 21 s

Benchmarking ets...
Benchmarking pool of genserver...
Benchmarking single genserver...

Name                        ips        average  deviation         median         99th %
ets                     1173.44        0.85 ms   ±578.99%        0.42 ms        7.64 ms
single genserver           4.97      201.02 ms     ±0.10%      201.01 ms      201.63 ms
pool of genserver          4.97      201.20 ms     ±0.37%      201.02 ms      205.29 ms

Comparison: 
ets                     1173.44
single genserver           4.97 - 235.89x slower +200.17 ms
pool of genserver          4.97 - 236.09x slower +200.34 ms

Finally I was able to reach pretty good performance with GenServer by implementing two things:

  • Backlog management : like a semaphore for GenServer , limiting the message sending from the client (ETS table and atomic counter)
  • Using asynchronous calls (GenServer.cast) and queuing requests (ETS table) , avoiding timeout and limitation from the GenServer.call
  • Using active once to have a reactive way with message handling while having a backpressure from the TCP backlog.

Even with a single GenServer I got pretty good perf.

I got inspired by Shackle