Proper architecture for Dirty NIF ZMQ and Elixir processes for broker implementation

Hello everyone,

I’m implementing broker-like logic in Elixir that uses ZMQ for clients and services (workers) connectivity, which are Golang/Python/Node programs. I had rather limited success with pure Erlang chumak library, and now trying to move to the Dirty NIF based implementation like this - GitHub - lukaszsamson/erlzmq: Erlang binding for 0MQ (v2)

My main struggle here is how to organize proper/reliable structure of processes around calling Dirty NIF functions on the broker side. As example, here’s a snippet used to create/bind socket:

    {:ok, ctx} = :erlzmq.context()
    {:ok, socket} = :erlzmq.socket(ctx, :router)
    :erlzmq.setsockopt(socket, :sndhwm, 0)
    :erlzmq.setsockopt(socket, :rcvhwm, 0)
    :erlzmq.setsockopt(socket, :router_handover, 1)
    :erlzmq.setsockopt(socket, :identity, String.to_charlist(UUID.uuid4()))
    :ok = :erlzmq.bind(socket, endpoint)

Now I would like to have a “loop” to receive messages from 2 “mailboxes”:

  1. ZMQ incoming messages, calling :erlzmq.recv_multipart(socket) (optionally with :dontwait flag for non-blocking), and handling these messages from clients/services
  2. Process mailbox, for messages to be written to the socket using :erlzmq.send_multipart(socket, [...])

My first implementation looked like this:

  defp loop(socket, handler) do
    case :erlzmq.recv_multipart(socket, [:dontwait]) do
      {:ok, data} ->
        Logger.debug("received data: #{inspect(data)}")

      {:error, :eagain} ->
        # No data, try again later (c)
        nil

      {:error, reason} ->
        Logger.error("error in receive: #{inspect(reason)}")
    end

    receive do
      {:message_type, _value} ->
        Logger.debug("received message from parent")
    after
      0 -> nil
    end

    loop(socket, handler)
  end

But the part with after 0 looks sloppy to me, I’m not sure it’s idiomatic way in Erlang/Elixir (this is rather application of Golang’s for/select loop on channels). The observer has shown huge number of Reds growing for the process with this loop

QUESTION 1: Is approach above acceptable? Any impact of VM health and performance?

Another option I can imagine is running one genserver and 2 tasks: one task to read from ZMQ and other task to process internal mailbox. But I’m not sure ZMQ socket can be “shared” across processes (definitely a no-go in Golang/Python and other stacks).

QUESTION 2: Is it safe to share ZMQ socket resource and Dirty NIF calls across processes?
I read through erl_nif documentation, but not sure the paragraph on concurrency is applicable here.

Thank you in advance!

QUESTION 1: Is approach above acceptable? Any impact of VM health and performance?

Yes. This is similar to the designs I come up with. I don’t have much experience with router socket but I’ve been using RPC over dealer and pubsub subscriver with sub sockets. Sub is definitely simpler as it is receive only. I start a process that uses a blocking call to recv_multipart and pumps messages to downstream consumers. Most of the time here the OTP process just waits on mutex in dirty NIF so it is rather lightweight.

  def recieve_loop(server, socket, pid) do
    res =
      case :erlzmq.recv_multipart(socket) do
        {:ok, [envelope, message]} ->
          send(pid, {:message, envelope, message})

          # backpressure - wait until cosumer can process new message before receiving
          receive do
            :continue ->
              :ok

            {:DOWN, _ref, :process, ^pid, _reason} ->
              close_socket(socket, server)
              :stop
          end

        {:error, :eagain} ->
          :ok

        {:error, reason} ->
          Logger.error("Receive from #{server} failed for reason: #{inspect(reason)}")
          if reason in [:eterm, :enotsock], do: :stop, else: :ok
      end

    if res == :ok do
      recieve_loop(server, socket, pid)
    end
  end

Dealer (and router) has to support both out coming and incoming messages. Here I’m starting an OTP process for each client of mu RPC. Then in a loop I switch back and forth between receiving control messages and blocking ZMQ polling with a timeout. If poll indicates incoming messages I receive with dontwait. If there are queued messages to send and ZMQ high water mark is not reached (pollout flag returned) I try to send. This way the process is not spinning too much and is able to react both to incoming OTP and ZMQ messages.

def recieve_loop(socket, pid, server, type, parent_ref, to_send, to_receive) do
    current_time_monotonic = System.monotonic_time(:millisecond)
    current_time = System.system_time(:millisecond)

    to_send =
      drop_expired_ttl_to_send(to_send, current_time, current_time_monotonic, pid, server, type)

    to_receive =
      drop_expired_ttl_to_receive(
        to_receive,
        current_time,
        current_time_monotonic,
        pid,
        server,
        type
      )

    after_timeout =
      if to_send == [] and to_receive == %{} do
        @wait_for_message_timeout
      else
        0
      end

    receive do
      {:DOWN, ^parent_ref, :process, _pid, _reason} ->
        Logger.debug("#{server}_#{type}_#{inspect(pid)} closing due to parent exit")
        close_socket(socket, server)

      {:DOWN, _ref, :process, ^pid, _reason} ->
        Logger.debug("#{server}_#{type}_#{inspect(pid)} closing due to client exit")
        close_socket(socket, server)

      {:send, messages} ->
        recieve_loop(socket, pid, server, type, parent_ref, to_send ++ messages, to_receive)
    after
      after_timeout ->
        # allways try to receive, send only when there is sth to send
        flags =
          case to_send do
            [] -> [:pollin]
            _ -> [:pollin, :pollout]
          end

        case :erlzmq.poll(socket, flags, @poll_timeout) do
          {:ok, rflags} ->
            process_poll_flags(rflags, socket, pid, server, type, parent_ref, to_send, to_receive)

          {:error, reason} ->
            Logger.error("#{server}_#{type}_#{inspect(pid)}: poll returned #{inspect(reason)}")
            recieve_loop(socket, pid, server, type, parent_ref, to_send, to_receive)
        end
    end
  end

Some more care needs to be taken to ensure sockets are correctly closed, pids are not left dangling after client process dies etc but that is something app specific.
Also note that if you use dontwait and receive with after 0 your process will spin and use up too much reductions.

QUESTION 2: Is it safe to share ZMQ socket resource and Dirty NIF calls across processes?
I read through erl_nif documentation, but not sure the paragraph on concurrency is applicable here.

Short answer yes (unless there are undiscovered bugs).

libzmq sockets are not thread safe. The progress on thread safe ones is in hiatus. Moreover, all API calls touching a socket MUST happen from the same thread that created it. The docs state that full memory barrier (e.g. a mutex) should be enough to transfer a socket between threads. Unfortunately this is not true since libzmq 4.x and doing that results in a crash with assertion failed… The libzmq developers chose a lockless implementation which burdens the user with all the locking.

In erlang VM there is no control over on which scheduler thread the erlang process code is executing (and ordinary NIFs). Similar thing applies to dirty NIFs. There is no control over on which dirty scheduler thread is the dirty NIF executed. Hence erlzmq_dnif starts a dedicated thread for each socket. All :erlzmq API calls that touch a socket actually acquire a mutes, pass a message to the socket thread, signal and wait on condition variable for a response. The socket thread receives control messages and translates them to libzmq API calls and sends the response back to dirty NIF thread.

On the erlang side the socket is really just a reference that the NIF maps to C struct with mutex, conditional and socket pointers. The NIF is designed to be thread safe and should handle multiple processes using the same socket but there will be contention over mutex access and blocking libzmq calls.

Some caveats:

  • I had really bad experience with zmq healthchecks Assertion failed: _input_stopped (stream_engine.cpp:467) · Issue #3937 · zeromq/libzmq · GitHub and many more. Do not use them in production
  • you may need to increase the number of dirty scheduler threads with +SDio option
  • tune libzmq context (max_sockets, io_threads)
  • set reasonable linger on sockets
  • remember that blocking calls will not return until there’s a message or error. In most places you want timeouts (sndtimeo, rcvtimeo or dontwait)
  • the OS user needs to be able to open a lot of file descriptors or you will start getting emfile errors

If what I describe turns out too problematic for your use case then implementing your logic directly in C (or rust/zig) may be a better option but that’s considerably more effort.

3 Likes

@lukaszsamson thank you for a detailed response and sharing your examples!
Super informative and very helpful!

I will try to experiment with both approaches and will be sharing the discoveries if any.

For additional context, i’m trying to implement the MDP protocol in ZMQ: 7/MDP | ZeroMQ RFC , according to which the whole communication is mainly async:

  • “Clients SHOULD use a REQ socket when implementing a synchronous request-reply pattern.”
  • “Clients MAY use a DEALER (XREQ) socket when implementing asyncronous pattern”
  • “The broker MUST use a ROUTER socket to accept requests from clients, and connections from workers. The broker MAY use a separate socket for each sub-protocol, or MAY use a single socket for both sub-protocols.”