Hello guys I have made an elixir http rest api using Maru. My requests are processed through a separate module using Genserver. Everything is working fine but I need to make the requests work synchronically without waiting each other. They have to be able to use the GenServer’s functions at the same time not one by one. I cannot use cast as I have to return a response. When one of the requests enters the GenServer.call the other one waits and enters after the first one has finished.
You could do something like
def handle_call(msg, from, state) do
spawn(fn ->
reply = do_stuff_with_message(msg)
GenServer.reply(from, reply)
end)
{:noreply, state}
end
Here is an example.
HTTP Server.
defmodule HttpSrv do
use Maru.Router
plug(Plug.Logger)
require Logger
plug CORSPlug
post do
Logger.info("[Maru] Received a post request.")
## Start the processing of each request
resp = Calculator.calc(5, 5)
Maru.Response.json(conn, %{response: resp})
end
end
The GenServer module.
defmodule Calculator do
use GenServer
require Logger
## Client API
def start_link(_arg) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
def calc(a, b) do
GenServer.call(__MODULE__, {:calc, a, b}, :infinity)
end
## Server Callbacks
def init(:ok) do
{:ok, %{}}
end
def handle_call({:calc, a, b}, _from, state) do
result = a + b
{:reply, result, state}
end
end
You don’t even have to reply in another process. The trick is that handle_call
doesn’t have to reply immediately. So the state of the call can be stored in the GenServer
state and once the reply is ready GenServer.reply/2
can be used to complete the call. Just remember that on the client side there is a timeout.
If they do not need to alter the state of the GenServer, then @voughtdq’s solution works fine (though they will still be serialized; but the time they block each other will be minimal, just the time require to spawn the process and return from handle_call).
The caveat in @voughtdq’s approach is that the requests can not change the state of the GenServer. So as long as the GenServer’s state is immutable with respect to the requests, that will work quite fine until you really start hammering it with requests (… and then, the handle_call is unlikely to be the bottleneck).
If requests do need to change the GenServer’s state, then you need a different solution, though it will also involve spawning multiple processes so they can service requests in parallel.
Additionally, that approach offers no mechanism for back-pressure or rate-limiting on its own, though it could be added …
I have already tried this but it is waiting before entering the handle_call. It is waiting where I call the Genserver.call function.
Edit: ignore. calc
is part of the client API, not the server processing.
Wait, do you mean that the HTTP server is blocking?
Exactly. It is blocked and doesn’t accept more calls until the previous one has finished.
No, not the HTTP, the GenServer as @peerreynders already said.
Building Non Blocking Erlang apps
Also: Why does this simple GenServer timeout?
Your could for example launch the calculation call via Task.async/1
which gives you a %Task{owner: term(), pid: term(), ref: term()}
. Store that together with the caller details in the GenServer state.
When the task is done, you’ll get a {ref,result}
via handle_info
and you can complete the call with GenServer.reply/2
.
For a cleaner result, also take care of the details like Task.await does, i.e. demonitor and process :DOWN
messages
Can you talk about your use case for the GenServer? GenServers are single threaded, that’s just what they are, there’s no getting around that. There’s probably a way to solve your problem without using a single GenServer, but we can’t recommend one without details about what it does.
@benwilson512 I have provided an example above. Let’s say that 5 people make a post request at the same time. For this example they will receive the response of the calculation immediately. But let’s say that each calculation takes 5 seconds. So in this case the first post request will be replied in 5 seconds. The second one will take 10 seconds and so on. I need to make them receive their responses in 5 seconds. That calculations for each of them have to be made at the same time not one by one. I am not sure if I could explain it.
@ayhan.rashidov What I’m trying to understand is why the calculation needs to happen in a GenServer. What is the calculation, can it just be done in each user’s request process? Can it be a query on an ets table? We can’t recommend options if we don’t know what the computation is.
This is just an example. Let’s say that the calculation must happen in the GenServer. This is the way I have to do it.
A single GenServer process can only ever work sequentially. If you need concurrency you need to spawn multiple processes.
The point is that each request should already be running in it’s own process. Therefore each isolated request can make the call independently (and be blocked) and then has to wait for it’s own call to return. So things are already inherently concurrent.
By introducing a single GenServer that all requests have to go through you have created a bottleneck.
So the question becomes, what is the benefit that this bottleneck gives you?
That calculations for each of them have to be made at the same time not one by one.
So have each request do its own calculations …
If it has to run in a genserver then you could run a pool of that type of genserver and checkout 1 each time work needs to be run. Making it nonblocking to the extent of how many are in your pool. Otherwise GensServer will always be sequential in execution.
There is a popular misconception that GenServer
provides a means to structure programs, hide information etc. This is not the case. Elixir and Erlang programs are structured as modules containing functions. If you need to “do calculations in one place”, you use a function, not a server. If you have requirements for shared state then the particulars of those requirements determine the solution.
Some playground code to explore the some of the effects:
defmodule Demo do
@delay 2000
defp report(index, base_time, start_time, end_time) do
offset = :timer.now_diff(start_time, base_time)
duration = :timer.now_diff(end_time, start_time)
IO.puts("#{index}: offset: #{offset}μs dur: #{duration}μs")
end
def wrap_fun(base_time) do
fn {fun, index} ->
fn ->
start_time = :erlang.timestamp()
result = fun.()
report(index, base_time, start_time, :erlang.timestamp())
result
end
end
end
def do_it(a,b) do
Process.sleep(@delay)
a + b
end
def direct_fun({{a,b}, index}) do
{fn -> do_it(a,b) end, index} # i.e. call function directly
end
def sequential(requests) do
base_time = :erlang.timestamp()
requests
|> Enum.map(&direct_fun/1) # i.e. function will be called directly
|> Enum.map(wrap_fun(base_time))
|> Enum.map(&(&1.())) # RUN functions to process requests sequentially one at a time
end
def concurrent(requests) do
base_time = :erlang.timestamp()
requests
|> Enum.map(&direct_fun/1) # i.e. function will be called directly
|> Enum.map(wrap_fun(base_time))
|> Enum.map(&Task.async/1) # LAUNCH tasks to RUN requests concurrently
|> Enum.map(&(Task.await(&1)))# and block until ALL tasks are finished
end
def to_call_fun(pid) do
fn {data, index} ->
{
fn ->
GenServer.call(pid, data, :infinity) # i.e. have GenServer call function
end,
index
}
end
end
def constrained(requests, pid) do
base_time = :erlang.timestamp()
requests
|> Enum.map(to_call_fun(pid)) # i.e. use GenServer to call do_it function
|> Enum.map(wrap_fun(base_time))
|> Enum.map(&Task.async/1) # LAUNCH tasks to RUN requests concurrently
|> Enum.map(&(Task.await(&1))) # and block until ALL tasks are finished
end
end
defmodule DemoSequential do
def init(args) do
{:ok, args}
end
def handle_call({a,b}, _from, state),
do: {:reply, Demo.do_it(a,b), state}
def handle_cast(:stop, state),
do: {:stop, :normal, state}
def terminate(reason, state) do
IO.puts "#{inspect __MODULE__} terminate: #{inspect reason} #{inspect state}"
end
end
defmodule DemoConcurrent do
def init(args) do
{:ok, args}
end
def launch_task(from, a, b) do
Task.start(fn ->
result = Demo.do_it(a,b)
GenServer.reply(from, result)
end)
end
def handle_call({a,b}, from, state) do
launch_task(from, a, b)
{:noreply, state}
end
def handle_cast(:stop, state),
do: {:stop, :normal, state}
def terminate(reason, state) do
IO.puts "#{inspect __MODULE__} terminate: #{inspect reason} #{inspect state}"
end
end
requests =
1..10
|> Enum.chunk_every(2, 2, :discard)
|> Enum.map(&List.to_tuple/1)
|> Enum.with_index(1)
IO.puts("Each tuple is a {{a,b}, index} request")
IO.inspect(requests)
{:ok, pid_sequential} = GenServer.start_link(DemoSequential,[])
{:ok, pid_concurrent} = GenServer.start_link(DemoConcurrent,[])
IO.puts("Sequential processing")
IO.inspect(Demo.sequential(requests))
IO.puts("Concurrent - requests are processed concurrently (and independently)")
IO.inspect(Demo.concurrent(requests))
IO.puts("Constrained - concurrently running requests served sequentially")
IO.inspect(Demo.constrained(requests, pid_sequential))
IO.puts("Constrained - concurrently running requests served concurrently")
IO.inspect(Demo.constrained(requests, pid_concurrent))
GenServer.cast(pid_concurrent, :stop)
GenServer.cast(pid_sequential, :stop)
Process.sleep(500)
$ elixir demo.exs
Each tuple is a {{a,b}, index} request
[{{1, 2}, 1}, {{3, 4}, 2}, {{5, 6}, 3}, {{7, 8}, 4}, {{9, 10}, 5}]
Sequential processing
1: offset: 2μs dur: 2001419μs
2: offset: 2003665μs dur: 2000581μs
3: offset: 4004330μs dur: 2000679μs
4: offset: 6005107μs dur: 2000903μs
5: offset: 8006094μs dur: 2001040μs
[3, 7, 11, 15, 19]
Concurrent - requests are processed concurrently (and independently)
1: offset: 2224μs dur: 2000612μs
2: offset: 2228μs dur: 2000650μs
3: offset: 2230μs dur: 2000656μs
4: offset: 2232μs dur: 2000660μs
5: offset: 2234μs dur: 2000665μs
[3, 7, 11, 15, 19]
Constrained - concurrently running requests served sequentially
1: offset: 46μs dur: 2000500μs
2: offset: 55μs dur: 4001751μs
3: offset: 59μs dur: 6002516μs
4: offset: 62μs dur: 8003581μs
5: offset: 65μs dur: 10004541μs
[3, 7, 11, 15, 19]
Constrained - concurrently running requests served concurrently
1: offset: 53μs dur: 2001087μs
2: offset: 63μs dur: 2001112μs
3: offset: 68μs dur: 2001116μs
4: offset: 71μs dur: 2001120μs
5: offset: 75μs dur: 2001122μs
[3, 7, 11, 15, 19]
DemoConcurrent terminate: :normal []
DemoSequential terminate: :normal []
$