Message queue length of a GenServer

How are you sending messages to that process?

I notice that you’re doing this in a handle_call which is synchronous. So, if you only have one process doing a GenServer.call to this process it’ll block until it gets a response. That means, you’ll never have a queue length, because this handle_call is consuming the only message that has been sent.

In short, what you’re doing is correct, but the way you’re testing it isn’t.

3 Likes

Try something like this to test:

1..100
|> Task.async_stream(fn _ -> 
  # Thing that calls your process
end)
|> Stream.run()
1 Like

Thanks @blatyo

If you want to observe the size and not assert on it, :observer is specifically for this. :observer.start() will open the Observer WX GUI, go to the processes tab and you can sort on the MsgQ column. Right-clicking will allow you to get a dump of the current messages, but the Processes tab will refresh itself every so often, so you can use it to monitor for message queues growing, which usually means you have a bug with an unmatched message. I have found at least 3 bugs in production using only the MsgQ column and clicking to sort it. It is super useful.

2 Likes

@KronicDeth thanks…but I want the GenServer to take actions based on the queue length.Imagine 3 GenServers spun up by a DynamicSupervisor and they are all sending messages to each other but they don’t accept new messages if the message count is over 50 in their respective queues and send a reply back to the caller …something like{:error, :queue_full}

Sounds like you want to control the ingestion of data in your system. If that’s the case, rather than fiddling with the queue length, I’d suggest taking a look at GenStage.

3 Likes

That is irrelevant given that the process can only control removal of messages from the mailbox and nothing else - i.e. a process can’t stop accepting messages; it can only stop accepting work which would be tracked inside the process state.

something like {:error, :queue_full}

a response like that happens when a received message is requesting more work but your work backlog indicates that you already have enough - i.e. the response should have nothing to do with messages remaining in the process mailbox but everything with information you currently have in the process state.

3 Likes

Thanks @stefanchrobot @peerreynders
@peerreynders basically I want to know the total remaining messages in the mailbox that haven’t been processed

A better way is to use ETS to track the queue length, then you can tell that the queue is full before messaging the process. See “Bounded Queues” under https://ferd.ca/handling-overload.html

Edit: just realized you might be beginning with Elixir, so this is probably overkill. What kind of work do your processes do? There’s likely a way to rearrange the problem so you don’t need bounded queues. For instance, using a process per message / task and limiting concurrency via DynamicSupervisor’s max children instead.

3 Likes

You already know the answer - you just aren’t creating the right conditions to see the effects.

defmodule Demo do

  defp do_it(state) do
    {_, num} = Process.info(self(), :message_queue_len)
    IO.puts "queue length: #{inspect num}"

    reply = {:reply, :hello, state}
    cond do
      num > 0 ->
        reply
      true ->
        send(self(), :done)
        reply
    end
  end

  def init(_args) do
    send(self(), :block) # get the first message in the mailbox
    {:ok, []}
  end

  def handle_call(:hi, _from, state),
    do: do_it(state)

  def handle_info(:block, state) do
    Process.sleep(1000) # block for one second
    {:noreply, state}
  end
  def handle_info(:done, state) do
    {:stop, :normal, state}
  end

  def terminate(reason, state) do
    IO.puts "terminate: #{inspect reason} #{inspect state}"
  end

end

{:ok, pid} = GenServer.start_link(Demo,[])
f = fn ->
  GenServer.call(pid, :hi)
  :ok
end

(for _ <- 1..4, do: Task.async(f))
|> Enum.map(&Task.await(&1))
$ elixir demo.exs
queue length: 3
queue length: 2
queue length: 1
queue length: 0
terminate: :normal []
$
3 Likes

@peerreynders thanks once again

If you proceed down this path and wish to return an {:error, :queue_full} you probably don’t want to do it from the handle_call. I’m assuming that if there are lots of messages that you want to discard the newest messages: the ones at the end of the queue. By putting the condition in handle_call if will affect the response to the message at the head of the queue.

In sandwich shop terms (because I’m hungry right now), putting the check and error response in handle_call is equivalent to getting in line at a sandwich shop before the lunch rush, waiting a little bit, lunch rush hits, getting to the counter to order and being told, “I’m sorry. I can’t make you a sandwich; the line is too long.” When I think what you want is a bouncer at the door that says, “I’m sorry. The line can’t extend outside.”

If you must implement this yourself rather than using some of the suggestions from other folks, you may wish to put the check in the client function rather than handle_call.

Here’s a quick proof of concept

defmodule OverloadedGenServer do
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, [])
  end

  def init(_) do
    {:ok, 0}
  end

  def add_num(pid, num) do
    case Process.info(pid, :message_queue_len) do
      {_, len} when len > 5 ->
        {:error, :queue_full}
      _ ->
        GenServer.call(pid, {:add_num, num})
    end
  end

  def handle_call({:add_num, num}, _from, prior_num) do
    new_num = prior_num + num
    Process.sleep(500)
    {:reply, new_num, new_num}
  end
end

{:ok, pid} = OverloadedGenServer.start_link()

for n <- 1..10 do
  Task.async(fn -> OverloadedGenServer.add_num(pid, n) end)
end
|> Enum.map(&Task.await/1)
|> IO.inspect

Process.exit(pid, :kill)

with output

$ elixir queue_test.exs
[
  1,
  3,
  6,
  10,
  15,
  21,
  {:error, :queue_full},
  {:error, :queue_full},
  {:error, :queue_full},
  {:error, :queue_full}
]
** (EXIT from #PID<0.73.0>) killed
3 Likes

Thanks @jeffweiss

@jeffweiss @peerreynders or anybody that can know the answer, why if I don’t block the init function with at least Process.sleep(50) the message queue length will always be 0?

Hi @boilercoding , it would be helpful if you paste your code and your outputs. Are you copy and pasting the exact code above, or is it altered? Are you inspecting the length to verify that it’s 0?

Hello @msimonborg thank you very much for replying to my message. After inspecting the length I was wrong is not actually 0, the code that I’m using is the following:

defmodule Demo do

  defp do_it(state) do
    {_, num} = Process.info(self(), :message_queue_len)
    IO.puts "queue length: #{inspect num}"

    {:reply, :hello, state}
  end

  def init(_args) do
    {:ok, []}
  end

  def handle_call(:hi, _from, state),
    do: do_it(state)

end

{:ok, pid} = GenServer.start_link(Demo,[])
f = fn ->
  GenServer.call(pid, :hi)
  :ok
end

(for _ <- 1..14, do: Task.async(f))
|> Enum.map(&Task.await(&1))

with output

queue length: 5
queue length: 12
queue length: 11
queue length: 10
queue length: 9
queue length: 8
queue length: 7
queue length: 6
queue length: 5
queue length: 4
queue length: 3
queue length: 2
queue length: 1
queue length: 0

But if I block as following:

defmodule Demo do

  defp do_it(state) do
    {_, num} = Process.info(self(), :message_queue_len)
    IO.puts "queue length: #{inspect num}"

    {:reply, :hello, state}
  end

  def init(_args) do
    {:ok, [], {:continue, :block}}
  end

  def handle_call(:hi, _from, state),
    do: do_it(state)

  def handle_continue(:block, state) do
    Process.sleep(1000) # block for one second
    {:noreply, state}
  end

end

{:ok, pid} = GenServer.start_link(Demo,[])
f = fn ->
  GenServer.call(pid, :hi)
  :ok
end

(for _ <- 1..14, do: Task.async(f))
|> Enum.map(&Task.await(&1))

Then it works as expected. So my question is why does blocking the process on initialization make it work as expected?

Thanks!

What do you mean by “as expected”? How do you want it to work?

Thanks for replying @Nicd, the desired output is the following:

queue length: 13
queue length: 12
queue length: 11
queue length: 10
queue length: 9
queue length: 8
queue length: 7
queue length: 6
queue length: 5
queue length: 4
queue length: 3
queue length: 2
queue length: 1
queue length: 0

I was trying to do a rate limit on an application with the message queue length, so I needed to keep track of the number of requests, now I know that the message queue length is not the best approach but I don’t understand why if I don’t block on the init function then the count of request will not be as I would expect.

You have 16 processes at work, each one with a job to do and each one can do its job asynchronously.

Your first process is the “caller” of your script or iex terminal, the one that is executing your code. It starts your 2nd process, the gen_server, which can then immediately go about its work asynchronously. Its job is to process messages in its queue and put the queue length to the terminal with each message it receives.

Once your gen_server is already doing its job waiting for messages, the caller starts up 14 new processes one by one, then waits until it receives messages from all of them. All 14 of those processes can do asynchronous work but they start their work in order one at a time. Their job is to immediately start sending messages to your gen_server and wait for its reply.

By the time your gen_server is processing its first message there are already 5 in the queue, but not 14, because it can start doing its job at the same time the caller is doing its job. Messages are piling up faster than the gen_server can process them, but they are finite, and once it catches up the length peaks and declines by 1 with each new message it processes.

1 Like

Thanks a lot @msimonborg it makes more sense now.

1 Like