QUESTION 1: Is approach above acceptable? Any impact of VM health and performance?
Yes. This is similar to the designs I come up with. I don’t have much experience with router
socket but I’ve been using RPC over dealer
and pubsub subscriver with sub
sockets. Sub is definitely simpler as it is receive only. I start a process that uses a blocking call to recv_multipart
and pumps messages to downstream consumers. Most of the time here the OTP process just waits on mutex in dirty NIF so it is rather lightweight.
def recieve_loop(server, socket, pid) do
res =
case :erlzmq.recv_multipart(socket) do
{:ok, [envelope, message]} ->
send(pid, {:message, envelope, message})
# backpressure - wait until cosumer can process new message before receiving
receive do
:continue ->
:ok
{:DOWN, _ref, :process, ^pid, _reason} ->
close_socket(socket, server)
:stop
end
{:error, :eagain} ->
:ok
{:error, reason} ->
Logger.error("Receive from #{server} failed for reason: #{inspect(reason)}")
if reason in [:eterm, :enotsock], do: :stop, else: :ok
end
if res == :ok do
recieve_loop(server, socket, pid)
end
end
Dealer (and router) has to support both out coming and incoming messages. Here I’m starting an OTP process for each client of mu RPC. Then in a loop I switch back and forth between receiving control messages and blocking ZMQ polling with a timeout. If poll indicates incoming messages I receive with dontwait
. If there are queued messages to send and ZMQ high water mark is not reached (pollout
flag returned) I try to send. This way the process is not spinning too much and is able to react both to incoming OTP and ZMQ messages.
def recieve_loop(socket, pid, server, type, parent_ref, to_send, to_receive) do
current_time_monotonic = System.monotonic_time(:millisecond)
current_time = System.system_time(:millisecond)
to_send =
drop_expired_ttl_to_send(to_send, current_time, current_time_monotonic, pid, server, type)
to_receive =
drop_expired_ttl_to_receive(
to_receive,
current_time,
current_time_monotonic,
pid,
server,
type
)
after_timeout =
if to_send == [] and to_receive == %{} do
@wait_for_message_timeout
else
0
end
receive do
{:DOWN, ^parent_ref, :process, _pid, _reason} ->
Logger.debug("#{server}_#{type}_#{inspect(pid)} closing due to parent exit")
close_socket(socket, server)
{:DOWN, _ref, :process, ^pid, _reason} ->
Logger.debug("#{server}_#{type}_#{inspect(pid)} closing due to client exit")
close_socket(socket, server)
{:send, messages} ->
recieve_loop(socket, pid, server, type, parent_ref, to_send ++ messages, to_receive)
after
after_timeout ->
# allways try to receive, send only when there is sth to send
flags =
case to_send do
[] -> [:pollin]
_ -> [:pollin, :pollout]
end
case :erlzmq.poll(socket, flags, @poll_timeout) do
{:ok, rflags} ->
process_poll_flags(rflags, socket, pid, server, type, parent_ref, to_send, to_receive)
{:error, reason} ->
Logger.error("#{server}_#{type}_#{inspect(pid)}: poll returned #{inspect(reason)}")
recieve_loop(socket, pid, server, type, parent_ref, to_send, to_receive)
end
end
end
Some more care needs to be taken to ensure sockets are correctly closed, pids are not left dangling after client process dies etc but that is something app specific.
Also note that if you use dontwait
and receive
with after 0
your process will spin and use up too much reductions.
QUESTION 2: Is it safe to share ZMQ socket resource and Dirty NIF calls across processes?
I read through erl_nif
documentation, but not sure the paragraph on concurrency is applicable here.
Short answer yes (unless there are undiscovered bugs).
libzmq sockets are not thread safe. The progress on thread safe ones is in hiatus. Moreover, all API calls touching a socket MUST happen from the same thread that created it. The docs state that full memory barrier (e.g. a mutex) should be enough to transfer a socket between threads. Unfortunately this is not true since libzmq 4.x and doing that results in a crash with assertion failed… The libzmq developers chose a lockless implementation which burdens the user with all the locking.
In erlang VM there is no control over on which scheduler thread the erlang process code is executing (and ordinary NIFs). Similar thing applies to dirty NIFs. There is no control over on which dirty scheduler thread is the dirty NIF executed. Hence erlzmq_dnif
starts a dedicated thread for each socket. All :erlzmq
API calls that touch a socket actually acquire a mutes, pass a message to the socket thread, signal and wait on condition variable for a response. The socket thread receives control messages and translates them to libzmq API calls and sends the response back to dirty NIF thread.
On the erlang side the socket is really just a reference that the NIF maps to C struct with mutex, conditional and socket pointers. The NIF is designed to be thread safe and should handle multiple processes using the same socket but there will be contention over mutex access and blocking libzmq calls.
Some caveats:
- I had really bad experience with zmq healthchecks Assertion failed: _input_stopped (stream_engine.cpp:467) · Issue #3937 · zeromq/libzmq · GitHub and many more. Do not use them in production
- you may need to increase the number of dirty scheduler threads with
+SDio
option
- tune libzmq context (
max_sockets
, io_threads
)
- set reasonable
linger
on sockets
- remember that blocking calls will not return until there’s a message or error. In most places you want timeouts (
sndtimeo
, rcvtimeo
or dontwait
)
- the OS user needs to be able to open a lot of file descriptors or you will start getting
emfile
errors
If what I describe turns out too problematic for your use case then implementing your logic directly in C (or rust/zig) may be a better option but that’s considerably more effort.