Why is GenServer communication generally synchronous?

I need a process that:

  • keeps some state,
  • implementa some behaviour with a get_foo() function (does some simple computation),
  • periodically updatea it’s state with potentially time-consuming operation (i.e. HTTP response).

The problem here is that I need get_foo() to return really quickly and if that’s implemented as a GenServer.call then it’s possible that the process might be in the middle of handling a long running message.

Solving this turns out to be more difficult (inconvenient?) than I expected. Neither HTTPotion, nor Tesla offer a simple API that would send the response to the caller as a message. This got me thinking that very few APIs are like that and generally that’s how APIs built on top of GenServer usually work. Why is that?

And to initial thing: how to solve the problem that I have? I could start a Task, but that’s a lot of “paperwork” (handling different kinds of messages, probably another supervisor is needed). I’m rather thinking of moving the state out of the process and implementing get_foo() as a pure function that would run in the client process (I can change the definition of get_foo()).

Because you can always slave it out to a new process to make it async. Honestly I prefer worker pools for things like that though.

Well depending on what you need to do just making pools (poolboy or so) helps a lot to constrain resources but open it up well.

However, GenServer communication is not generally synchronous, it fully supports both sync and async calls, that is why most of my libraries (a noteable one of TaskAfter) supports both styles.

1 Like

Just an idea for solving your problem: If you have a state that is read often and gets mutated from time to time, you can implement this with an ets table that is written to through one process but that is read by any process.

6 Likes

I could start a Task , but that’s a lot of “paperwork”

Seems a bit of an exaggeration.

1 Like

I believe you’re over-thinking it:

def handle_response(%HTTPoison.Response{} = resp) do
  GenServer.cast(:me, {:handle_response, resp})
end

and, elsewhere:

Task.spawn(fn() -> handle_response(HTTPoison.get!(...)) end)
3 Likes

There are two separate issues here: synchronous calls and rapid notifications.

You make a call synchronous because you want to ensure the server received the request. This does not have to mean the server has a response to the computation behind the request. It only means the server received the request.

Rapid notification usually refers to a quick response from the server to the request. For example if you use GenServer.cast the notification would be immediate. Granted there is no guarantees the server received the request.

You can have both, it just takes a bit of thought.

In your description I get the impression get_foo/0 is important while updating server state having a lower priority. Have GenServer.handle_call for get_foo perform as normal, while to update the state spawn a Task that does the grunt work and only update the server state with the final result.

The downside if you might be operating get_foo/0 with potentially older state.

1 Like

Given a client and server, when client calls an API of the server, this is in most cases GenServer.call underneath which is logically send and receive, so a synchronous wrapper around async communication. Wrapping this in another series of messages is wasteful. I’m thinking that all those APIs should be async-first.

Sure, but it seems most libraries are not like that :thinking:

:+1: I think I’ll end up doing that.

Is it? No process should run unsupervised, so I need to spawn a Task.Supervisor and add that Task there. To me that’s quite a lot to make an async HTTP request.

Thare’s no such function as Task.spawn. Did you mean Task.async? If so, the task must be awaited so we’re back to square one. Otherwise, I need to run it under Task.Supervisor.

EDIT: I think you meant Task.start, right?

Premature optimization + trying to treat Elixir like Node.js. The greatest thing about Elixir concurrency is writing sequential code as sequential, and letting the runtime take care of juggling I/O blocking. Rather than a rat’s nest of callback/async/promise/await… That fact is that just calling a function and getting a return value when it’s ready is far simpler than juggling async updates.

That’s simply not true.

Ask yourself, what purpose would supervision of this task accomplish? What would you have the supervisor do in case of failure? If the answer is “nothing”, then it’s not needed. If the answer is “retry the request”, I’d ask why? It’s a periodic update, if it fails then just let the next one run. If you want to try it twice with a small delay in between, just do that in the task.

(You really don’t want to have it under a supervisor that will continuously retry until success, when you’re launching that supervisor/task pair periodically…)

Yes. Sorry for the confusion, I typed that example from memory and got it wrong.

2 Likes

Sounds like it’s time for a PR then! ^.^

1 Like

The point of supervisors is not just about restarting, but first and foremost about managing lifecycle of other processes. If processes sit in a supervision tree, then it’s ensured they are properly stopped when invoking e.g. System.stop, Application.stop, or when an ancestor supervisor is taken down. In other words, supervising a process ensures that there will be no dangling processes left behind when parts of the system are terminated, and that’s why I think that every process should always be supervised.

7 Likes

You could consider starting the task directly from the GenServer. I do this from time to time, and have written a library called Parent, which implements the common boilerplate.

3 Likes
  defp slow_cast({{slow, calls},{delay, count}}) do
    current = self()
    spawn(fn -> send(current, {:slow_result, Slow.slow_call(slow, count)}) end)
    make_state(slow, calls - 1, delay, count)
  end

is all it takes.

Details
# file: demo.exs
defmodule Slow do

  defp make_state(delay, calls),
    do: {delay, calls}

  defp slow_compute(value, {delay, calls}) do
    result = {:reply, {:result, {value,value * value}}, make_state(delay, calls-1)}
    reply = cond do
      calls < 2 ->
        send(self(),:done)
        result
      true ->
        result
      end
    Process.sleep(delay)
    reply
  end

  # ---

  def init(args) do
    delay = Keyword.get(args, :delay, 500)
    calls = Keyword.get(args, :calls, 2)
    {:ok, make_state(delay, calls)}
  end

  def handle_call({:slow, value}, _from, state),
    do: slow_compute(value, state)

  def handle_info(:done, state),
    do: {:stop, :normal, state}

  def terminate(reason, state),
    do: IO.puts "#{inspect self()} - terminate: #{inspect reason} #{inspect state}"

  # --- client API

  def slow_call(pid, value) do
    {:result, result} = GenServer.call(pid, {:slow, value})
    result
  end

end

defmodule Client do

  defp make_state(slow, calls, delay, count),
    do: {{slow, calls}, {delay, count}}

  defp continue_countdown({{slow, calls},{delay, count}}) do
    Process.send_after(self(), {:count, count}, delay)
    {:noreply, make_state(slow, calls, delay, count - 1)}
  end

  defp handle_countdown(value, state) do
    IO.puts "#{inspect self()} - count: #{value}"
    cond do
      value < 1 ->
        {:stop, :normal, state} # this process is done
      true ->
        continue_countdown(state) # continue processing
    end
  end

  defp slow_cast({{slow, calls},{delay, count}}) do
    current = self()
    spawn(fn -> send(current, {:slow_result, Slow.slow_call(slow, count)}) end)
    make_state(slow, calls - 1, delay, count)
  end

  defp handle_slow_result(result, {{_,calls},_} = state) do
    IO.puts "#{inspect self()} - result: #{inspect result}"
    new_state = cond do
      calls < 1 ->
        state
      true ->
        slow_cast(state)
    end
    {:noreply, new_state}
  end
  # ---

  def init(args) do
    slow = Keyword.get(args, :slow, nil)
    calls = Keyword.get(args, :calls, 2)
    delay = Keyword.get(args, :delay, 100)
    count = Keyword.get(args, :count, 10)

    cond do
      is_nil(slow) ->
        {:stop, :noproc} # no Slow process given
      true ->
        send(self(), :init)
        {:ok, make_state(slow, calls, delay, count)}
    end
  end

  def handle_info(:init, state) do
    new_state = slow_cast(state)
    continue_countdown(new_state)
  end
  def handle_info({:count, value}, state)do
    handle_countdown(value, state)
  end
  def handle_info({:slow_result, result}, state) do
    handle_slow_result(result, state)
  end

  def terminate(reason, state),
    do: IO.puts "#{inspect self()} - terminate: #{inspect reason} #{inspect state}"

end

defmodule Demo do

  defp wait_for_msg do
    receive do
      msg ->
        IO.puts "#{inspect self()} got message: #{inspect msg}"
    end
  end

  def run do
    calls = 2

    {:ok, slow_pid} =
      GenServer.start_link(Slow, [delay: 500, calls: calls])
    _ref_slow = Process.monitor(slow_pid)

    {:ok, client_pid} =
      GenServer.start_link(Client, [slow: slow_pid, count: 10, delay: 100, calls: calls])
    _ref_client = Process.monitor(client_pid)

    # wait until processes terminate
    wait_for_msg()
    wait_for_msg()
  end

end

Demo.run()
$ elixir demo.exs
#PID<0.99.0> - count: 10
#PID<0.99.0> - count: 9
#PID<0.99.0> - count: 8
#PID<0.99.0> - count: 7
#PID<0.99.0> - result: {10, 100}
#PID<0.99.0> - count: 6
#PID<0.99.0> - count: 5
#PID<0.99.0> - count: 4
#PID<0.99.0> - count: 3
#PID<0.99.0> - count: 2
#PID<0.99.0> - result: {5, 25}
#PID<0.98.0> - terminate: :normal {500, 0}
#PID<0.89.0> got message: {:DOWN, #Reference<0.1897547781.1499463688.20652>, :process, #PID<0.98.0>, :normal}
#PID<0.99.0> - count: 1
#PID<0.99.0> - count: 0
#PID<0.99.0> - terminate: :normal {{#PID<0.98.0>, 0}, {100, -1}}
#PID<0.89.0> got message: {:DOWN, #Reference<0.1897547781.1499463688.20654>, :process, #PID<0.99.0>, :normal}
$ 

The “paperwork”, as you say, is a result of the guarantees you want despite the fact that communication is asynchronous, guarantees that OTP can give you because it can leverage the inherent constraints of synchronous communication:

do_call(Process, Label, Request, Timeout) when is_atom(Process) =:= false ->
    Mref = erlang:monitor(process, Process),

    %% OTP-21:
    %% Auto-connect is asynchronous. But we still use 'noconnect' to make sure
    %% we send on the monitored connection, and not trigger a new auto-connect.
    %%
    erlang:send(Process, {Label, {self(), Mref}, Request}, [noconnect]),

    receive
        {Mref, Reply} ->
            erlang:demonitor(Mref, [flush]),
            {ok, Reply};
        {'DOWN', Mref, _, _, noconnection} ->
            Node = get_node(Process),
            exit({nodedown, Node});
        {'DOWN', Mref, _, _, Reason} ->
            exit(Reason)
    after Timeout ->
            erlang:demonitor(Mref, [flush]),
            exit(timeout)
    end.

https://github.com/erlang/otp/blob/master/lib/stdlib/src/gen.erl#L160-L181

  • Casting a message to a nonexistent process (or a process on an unreachable node) is detected via monitoring.
  • The server process dying before sending you a reply is detected via monitoring.
  • The server process not responding in due course is handled with a (blocking) receive timeout.

These are all problems that one has to explicitly account for (if necessary) when interacting asynchronously with another process - it doesn’t matter whether one is dealing with a self-spawned process or a process that is accepting asychronous messages for an entire OTP application.

1 Like

You mention HTTPotion, but you link to httpoison. Either way, both seem to have the support for async notification. HTTPoison, has the :async option in request/5. The docs are somewhat scarce, but I presume it just forwarded to the underlying hackney async functionality. HTTPoiton also supports async.

One possible problem with async is that you lose guarantees about the outcome. If a process performing the operation is taken down, you won’t be able to detect it. If you want to give up after some amount of time, you’ll need to roll your own logic based on Process.send_after or a GenServer timeout option. This is a general property of an async interface. With synchronous calls you’ll get an immediate feedback if the server stops, and you also have a timeout support. If you care about that, issuing the request in a separate process would help you separate the activity of serving the state from the activity of fetching the new state. If you decide to go down that path, you could use the aforementioned parent library to simplify managing the task as a direct child.

Alternatively, you could consider the Periodic module from the same library to run a periodical job which fetches the state and then sends it to the GenServer. If you decide to keep your state in ETS, you could still use Periodic to populate the table. If you opt for this approach, I’d recommend running the periodic job next to the GenServer, under a rest_for_one supervisor.

2 Likes

:+1: This was exactly the reason why I wanted to run the Task under a supervision even though @sribe was exactly on point that I don’t intend to restart it upon failure.

Sorry for the confusion with the links. But my impression is that the point the async API is to chunk the responses which is different from wanting to get the response in one go but as a message.

Either way, I think I might go with the parent library.

Perhaps, but I guess you could still use it to do the latter, although it would admittedly be somewhat more complicated.

Let me know if you have questions.