GenServer processing

Hello guys I have made an elixir http rest api using Maru. My requests are processed through a separate module using Genserver. Everything is working fine but I need to make the requests work synchronically without waiting each other. They have to be able to use the GenServer’s functions at the same time not one by one. I cannot use cast as I have to return a response. When one of the requests enters the GenServer.call the other one waits and enters after the first one has finished.

You could do something like

def handle_call(msg, from, state) do
  spawn(fn -> 
    reply = do_stuff_with_message(msg)
    GenServer.reply(from, reply)
  end)
  {:noreply, state}
end

Here is an example.

HTTP Server.

defmodule HttpSrv do

  use Maru.Router
  plug(Plug.Logger)
  require Logger
  plug CORSPlug

  post do
    Logger.info("[Maru] Received a post request.")
  
    ## Start the processing of each request
    resp = Calculator.calc(5, 5)
    Maru.Response.json(conn, %{response: resp})
  end

end

The GenServer module.

defmodule Calculator do

  use GenServer
  require Logger

  ## Client API

  def start_link(_arg) do
    GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def calc(a, b) do
    GenServer.call(__MODULE__, {:calc, a, b}, :infinity)
  end

  ## Server Callbacks

  def init(:ok) do
    {:ok, %{}}
  end

  def handle_call({:calc, a, b}, _from, state) do
    result = a + b
    {:reply, result, state}
  end

end

You don’t even have to reply in another process. The trick is that handle_call doesn’t have to reply immediately. So the state of the call can be stored in the GenServer state and once the reply is ready GenServer.reply/2 can be used to complete the call. Just remember that on the client side there is a timeout.

If they do not need to alter the state of the GenServer, then @voughtdq’s solution works fine (though they will still be serialized; but the time they block each other will be minimal, just the time require to spawn the process and return from handle_call).

The caveat in @voughtdq’s approach is that the requests can not change the state of the GenServer. So as long as the GenServer’s state is immutable with respect to the requests, that will work quite fine until you really start hammering it with requests (… and then, the handle_call is unlikely to be the bottleneck).

If requests do need to change the GenServer’s state, then you need a different solution, though it will also involve spawning multiple processes so they can service requests in parallel.

Additionally, that approach offers no mechanism for back-pressure or rate-limiting on its own, though it could be added …

I have already tried this but it is waiting before entering the handle_call. It is waiting where I call the Genserver.call function.

Edit: ignore. calc is part of the client API, not the server processing.

Wait, do you mean that the HTTP server is blocking?

Exactly. It is blocked and doesn’t accept more calls until the previous one has finished.

No, not the HTTP, the GenServer as @peerreynders already said.

Building Non Blocking Erlang apps

Also: Why does this simple GenServer timeout?

Your could for example launch the calculation call via Task.async/1 which gives you a %Task{owner: term(), pid: term(), ref: term()}. Store that together with the caller details in the GenServer state.

When the task is done, you’ll get a {ref,result} via handle_info and you can complete the call with GenServer.reply/2.

For a cleaner result, also take care of the details like Task.await does, i.e. demonitor and process :DOWN messages

2 Likes

Can you talk about your use case for the GenServer? GenServers are single threaded, that’s just what they are, there’s no getting around that. There’s probably a way to solve your problem without using a single GenServer, but we can’t recommend one without details about what it does.

2 Likes

@benwilson512 I have provided an example above. Let’s say that 5 people make a post request at the same time. For this example they will receive the response of the calculation immediately. But let’s say that each calculation takes 5 seconds. So in this case the first post request will be replied in 5 seconds. The second one will take 10 seconds and so on. I need to make them receive their responses in 5 seconds. That calculations for each of them have to be made at the same time not one by one. I am not sure if I could explain it.

@ayhan.rashidov What I’m trying to understand is why the calculation needs to happen in a GenServer. What is the calculation, can it just be done in each user’s request process? Can it be a query on an ets table? We can’t recommend options if we don’t know what the computation is.

This is just an example. Let’s say that the calculation must happen in the GenServer. This is the way I have to do it.

A single GenServer process can only ever work sequentially. If you need concurrency you need to spawn multiple processes.

1 Like

The point is that each request should already be running in it’s own process. Therefore each isolated request can make the call independently (and be blocked) and then has to wait for it’s own call to return. So things are already inherently concurrent.

By introducing a single GenServer that all requests have to go through you have created a bottleneck.

So the question becomes, what is the benefit that this bottleneck gives you?

That calculations for each of them have to be made at the same time not one by one.

So have each request do its own calculations …

2 Likes

If it has to run in a genserver then you could run a pool of that type of genserver and checkout 1 each time work needs to be run. Making it nonblocking to the extent of how many are in your pool. Otherwise GensServer will always be sequential in execution.

There is a popular misconception that GenServer provides a means to structure programs, hide information etc. This is not the case. Elixir and Erlang programs are structured as modules containing functions. If you need to “do calculations in one place”, you use a function, not a server. If you have requirements for shared state then the particulars of those requirements determine the solution.

8 Likes

Some playground code to explore the some of the effects:

defmodule Demo do

  @delay 2000

  defp report(index, base_time, start_time, end_time) do
    offset = :timer.now_diff(start_time, base_time)
    duration = :timer.now_diff(end_time, start_time)
    IO.puts("#{index}: offset: #{offset}μs dur: #{duration}μs")
  end

  def wrap_fun(base_time) do
    fn {fun, index} ->
      fn ->
        start_time = :erlang.timestamp()
        result = fun.()
        report(index, base_time, start_time, :erlang.timestamp())
        result
      end
    end
  end

  def do_it(a,b) do
    Process.sleep(@delay)
    a + b
  end

  def direct_fun({{a,b}, index}) do
    {fn -> do_it(a,b) end, index} # i.e. call function directly
  end

  def sequential(requests) do
    base_time = :erlang.timestamp()
    requests
    |> Enum.map(&direct_fun/1)    # i.e. function will be called directly
    |> Enum.map(wrap_fun(base_time))
    |> Enum.map(&(&1.())) # RUN functions to process requests sequentially one at a time
  end

  def concurrent(requests) do
    base_time = :erlang.timestamp()
    requests
    |> Enum.map(&direct_fun/1)    # i.e. function will be called directly
    |> Enum.map(wrap_fun(base_time))
    |> Enum.map(&Task.async/1)    # LAUNCH tasks to RUN requests concurrently
    |> Enum.map(&(Task.await(&1)))# and block until ALL tasks are finished
  end

  def to_call_fun(pid) do
    fn {data, index} ->
      {
      fn ->
        GenServer.call(pid, data, :infinity) # i.e. have GenServer call function
      end,
      index
      }
    end
  end

  def constrained(requests, pid) do
    base_time = :erlang.timestamp()
    requests
    |> Enum.map(to_call_fun(pid))  # i.e. use GenServer to call do_it function
    |> Enum.map(wrap_fun(base_time))
    |> Enum.map(&Task.async/1)     # LAUNCH tasks to RUN requests concurrently
    |> Enum.map(&(Task.await(&1))) # and block until ALL tasks are finished
  end
end

defmodule DemoSequential do

  def init(args) do
    {:ok, args}
  end

  def handle_call({a,b}, _from, state),
    do: {:reply, Demo.do_it(a,b), state}

  def handle_cast(:stop, state),
    do: {:stop, :normal, state}

  def terminate(reason, state) do
    IO.puts "#{inspect __MODULE__} terminate: #{inspect reason} #{inspect state}"
  end

end

defmodule DemoConcurrent do

  def init(args) do
    {:ok, args}
  end

  def launch_task(from, a, b) do
    Task.start(fn ->
      result = Demo.do_it(a,b)
      GenServer.reply(from, result)
    end)
  end

  def handle_call({a,b}, from, state) do
     launch_task(from, a, b)
     {:noreply, state}
  end

  def handle_cast(:stop, state),
    do: {:stop, :normal, state}

  def terminate(reason, state) do
    IO.puts "#{inspect __MODULE__} terminate: #{inspect reason} #{inspect state}"
  end

end

requests =
  1..10
  |> Enum.chunk_every(2, 2, :discard)
  |> Enum.map(&List.to_tuple/1)
  |> Enum.with_index(1)

IO.puts("Each tuple is a {{a,b}, index} request")
IO.inspect(requests)

{:ok, pid_sequential} = GenServer.start_link(DemoSequential,[])
{:ok, pid_concurrent} = GenServer.start_link(DemoConcurrent,[])

IO.puts("Sequential processing")
IO.inspect(Demo.sequential(requests))

IO.puts("Concurrent - requests are processed concurrently (and independently)")
IO.inspect(Demo.concurrent(requests))

IO.puts("Constrained - concurrently running requests served sequentially")
IO.inspect(Demo.constrained(requests, pid_sequential))

IO.puts("Constrained - concurrently running requests served concurrently")
IO.inspect(Demo.constrained(requests, pid_concurrent))

GenServer.cast(pid_concurrent, :stop)
GenServer.cast(pid_sequential, :stop)
Process.sleep(500)
$ elixir demo.exs
Each tuple is a {{a,b}, index} request
[{{1, 2}, 1}, {{3, 4}, 2}, {{5, 6}, 3}, {{7, 8}, 4}, {{9, 10}, 5}]
Sequential processing
1: offset: 2μs dur: 2001419μs
2: offset: 2003665μs dur: 2000581μs
3: offset: 4004330μs dur: 2000679μs
4: offset: 6005107μs dur: 2000903μs
5: offset: 8006094μs dur: 2001040μs
[3, 7, 11, 15, 19]
Concurrent - requests are processed concurrently (and independently)
1: offset: 2224μs dur: 2000612μs
2: offset: 2228μs dur: 2000650μs
3: offset: 2230μs dur: 2000656μs
4: offset: 2232μs dur: 2000660μs
5: offset: 2234μs dur: 2000665μs
[3, 7, 11, 15, 19]
Constrained - concurrently running requests served sequentially
1: offset: 46μs dur: 2000500μs
2: offset: 55μs dur: 4001751μs
3: offset: 59μs dur: 6002516μs
4: offset: 62μs dur: 8003581μs
5: offset: 65μs dur: 10004541μs
[3, 7, 11, 15, 19]
Constrained - concurrently running requests served concurrently
1: offset: 53μs dur: 2001087μs
2: offset: 63μs dur: 2001112μs
3: offset: 68μs dur: 2001116μs
4: offset: 71μs dur: 2001120μs
5: offset: 75μs dur: 2001122μs
[3, 7, 11, 15, 19]
DemoConcurrent terminate: :normal []
DemoSequential terminate: :normal []
$
1 Like