OTP/GenServer and selective receive?

If I understand correctly, if you use GenServer, you need to forfeit the selective receive functionality of Erlang.

What I mean by that is that since you give up control of the receive loop, there’s no way to say: handle messages tagged :high if they are present in the process mailbox, and if not, handle messages tagged :normal.

I realise that it’s easy to starve the :normal queue if you are not careful, and this is probably why OTP uses the process mailbox as a plain FIFO queue. I hope my assumptions are correct!

In any case, I had a use case where I wanted to serialise access to a shared resource for “normal” requests, but fulfil “priority” requests immediately. The underlying shared resource is actually capable of handling multiple concurrent requests, but I wanted to have some control over the parallelism. Here’s my proof of concept code:

defmodule Limiter do
  use GenServer

  defmodule Serializer do
    use GenServer
    def init(_) do
      {:ok, nil}
    end

    def handle_cast({m, f, args, from}, nil) do
      r = apply(m, f, args)
      GenServer.reply(from, r)
      {:noreply, nil}
    end
  end

  def init({module, function}) do
    {:ok, serializer} = GenServer.start_link(Serializer, [])
    {:ok, {serializer, module, function}}
  end

  def handle_call({:normal, args}, from, {serializer, m, f}) do
    GenServer.cast(serializer, {m, f, args, from})
    {:noreply, {serializer, m, f}}
  end

  def handle_call({:priority, args}, from, {serializer, m, f}) do
    process(from, m, f, args)
    {:noreply, {serializer, m, f}}
  end

  defp process(from, m, f, args) do
    spawn_link(fn() ->
      r = apply(m, f, args)
      GenServer.reply(from, r)
    end)
  end
end

I would really appreciate if someone could proof my assumptions:

  1. I am using a separate GenServer to act as the serialising queue; since I’m using cast, the normal calls don’t block.
  2. It’s safe to use cast since I’m linking the Serializer process; if it dies, the Limiter will also die. If it’s alive, cast will always succeed.
  3. Similarly, using spawn_link for the priority calls makes the priority calls non-blocking.
  4. Therefore, the Limiter GenServer can accept connections even if previous connections are still processing. Individual callers will still block, as is expected.
  5. Since I’m using links, any downstream failure/exception will take everything down, so this can be used in a supervision tree.

Finally, I’m interested in approaches to test this. The trickiest thing to test is that normal calls do not block priority calls. My current testing approach is to use Process.sleep to simulate a long-running job, and send a message back to the test process, then assert that we receive them in the expected order. However I had to add some voodoo sleeps here and there, which I don’t like.

To avoid making this post any longer, my test suite is here: https://gist.github.com/orestis/ed19d0da066d3e65f919da3d76b8b224

3 Likes

Judging by your code, you’ll always handle a priority request in a freshly spawned process, while normal requests are serialized in the serializer GenServer. Given this, I don’t see the need for two GenServers. You could do the same thing if the priority request is handled in the client process. Doing that means you only need to deal with normal requests, so you only need one GenServer.

If however you want to limit concurrency and assign priorities to requests, then you need one GenServer which acts as a queue, and it will spawn separate processes to consume the items. Perhaps GenStage could be used for this, or otherwise plain GenServer together with monitored tasks will definitely work.

A few comments on your assumptions:

The normal calls don’t block GenServer (well, they do, but not for a long time), but they block the client.

Cast will indeed always succeed. However, if new requests arrive much faster than the serializer is able to process them, the serializer mailbox will grow, and you’ll experience some significant latency increases. Also, having a single serializer process means a lot of garbage is generated which adds to GC pressure. I’d rather start a separate task for each request, and use monitor to find out when the task is done. I explained this idea recently in this thread.

You’re basically on the right track. Your limiter GenServer will not wait until requests finish, so it will be able to queue more requests, even though some are currently being processed. The implementation is somewhat messy, so I suggest looking into the thread I linked for some pointers.

Assuming you’re not trapping exits in the limiter process, yes. However, there are some edge cases which might left your request processes dangling if the parent process terminates. Also, your structure is not friendly for user hot code upgrades. Finally, if any request fails, everything will fail. For example, if a single normal request crashes, all pending normal requests are lost, as well as all currently running priority requests.

To systematically address those issues, it’s usually good to keep all processes OTP compliant (aka special processes), and keep all parent processes supervisors. In your case this would mean something along the lines of having a separate task supervisor where you’ll start your request processors. The limiter GenServer would keep track of currently running tasks, as well as pending requests. It would also monitor started tasks, so it would know whenever some task terminates (normally or abnormally). When that happens, the limiter GenServer can start the next task, or even restart the failing task if you want to that. Keeping all tasks under a supervisor also mean you can stop them in a predictable fashion, and ensure proper process cleanup.

You can make such test predictable by introducing some message passing dance between request processor and test process. Basically, you want your normal request to block until you send it some message. Then, you can start a high prio test and verify that it succeeded. Finally, you send the “unlock” message to the blocked process and assert that it succeeded.

3 Likes

Thanks for the comprehensive reply!

D’oh! I was going this way because I want the API to be uniform, but I never did define a public API :blush:

I would like to use GenServer so I can explore the little intricacies and patterns. Before I make this 100% battle-proof I’d like to actually deploy as a test and see how the system performs.

What I meant is that the handle_call callback returns control immediately so the next call can be processed. The client will indeed block until reply is called.

I do need the replies from the requests, so I’m not sure how the monitored tasks approach would work. Thanks for bringing up back pressure, I was planning to add it on the client side of this, but I’m not sure how to communicate it back to the client (other than use the time taken to service the request).

Thanks for that! I will investigate how messages/monitors can help simplify the code.

I think that for my use case this is correct, but I need to double check.

Come to think of it, I could just call reply from inside the Task, so I can respond to the caller? I need to think more about what will happen if a Task restarts and so on.

Brilliant! Thank you very much for your time and effort!

There are some gen_server implementations in Erlang (like this one that use a priority setup instead of raw messages too. :slight_smile: