When should I start using more advanced Elixir features like OTP, Genservers, etc

What @zachallaun and @D4no0 say is correct.

This can be used in various ways, but one is that the same GenServer can be used to block multiple processes on handle_call by not returning a reply (i.e. returning {:noreply, state} from handle_call). These processes then block, waiting for their replies that you can later formulate and send with GenServer.reply/2. You need to remember the from ref of the client that’s waiting on the call, however.

1 Like

Yeah but it’s an awful practice to rely on several linked sub-jobs to complete before a timeout expires. That’s fragile.

The timeout is configurable and it can be set to :infinity

Yep, that would make it better indeed. Combined with good observability (OpenTelemetry + HoneyComb or OpenObserve) it would make troubleshooting and finding bottlenecks relatively easy.

Wasn’t aware that you can actually choose to respond later to a GenServer call, interesting, thanks for that tidbit (you + @D4no0 + @zachallaun). Definitely overlooked that.

Maybe there’s a place for something in-between Elixir’s OTP helpers and Oban. Your scenario sounds like a good candidate.

2 Likes

This is a very common technique and changes how you think about concurrency in Elixir. It performs very similar function as promises in other languages.

If you look at GitHub for GenServer.reply you will find examples from all around ecosystem, including the top projects like LiveView, Phoenix, Ecto, varions HTTP clients, database connectors etc. etc. this technique is all over the place.

What’s super cool about it is that the process that responds to the call doesn’t have to be the GenServer process at all.

A GenServer can be a “dispatcher” of calls to other processes, and they can reply directly to the waiting, caller process.

4 Likes

Sure. I simply never needed it. To me it was “either do some work in the call handler and reply ASAP or just use Oban”. :smiley:

2 Likes

This is a really cool pattern!

I’ve just written a quick demo that

  1. Receives a call
  2. Passes the input to a Task started with start_link, together with the caller info
  3. Replies with :noreply to the caller
  4. The task responds to the original caller when it’s done processing.
  5. Crucially, if the task crashes, it will take down the server, so the caller will get detached, even if it passed :inifity as the timeout.
defmodule DelayedServer do
  require Logger
  use GenServer

  def init(_init_arg) do
    {:ok, nil}
  end

  def handle_call({:get_data, city}, from, state) do
    Task.start_link(fn -> slow_call_then_reply(city, from) end)
    Logger.info("Asked about #{city}, async task started")
    {:noreply, state}
  end

  def slow_call_then_reply(city, caller) do
    Logger.info("Asked about #{city}, sleeping...")
    Process.sleep(1000)
    Logger.info("Asked about #{city}, done sleeping...")
    GenServer.reply(caller, {:reply, get_message(city)})
  end

  def get_message("London" = city), do: "#{city} is the capital of United Kingdom"
  def get_message("Paris" = city), do: "#{city} is the capital of London"
  def get_message(city), do: raise("Unknown city #{city}")
end

# iex(1)> {:ok, pid} = GenServer.start(DelayedServer, nil)
# {:ok, #PID<0.147.0>}
# iex(2)> GenServer.call(pid, {:get_data, "London"})

# 18:06:06.191 [info] Asked about London, async task started

# 18:06:06.191 [info] Asked about London, sleeping...

# 18:06:07.204 [info] Asked about London, done sleeping...
# {:reply, "London is the capital of United Kingdom"}
# iex(3)> GenServer.call(pid, {:get_data, "Paris"}, :infinity)

# 18:06:19.411 [info] Asked about Paris, async task started

# 18:06:19.411 [info] Asked about Paris, sleeping...

# 18:06:20.411 [info] Asked about Paris, done sleeping...
# {:reply, "Paris is the capital of London"}
# iex(4)> GenServer.call(pid, {:get_data, "Dublin"}, :infinity)

# 18:06:28.652 [info] Asked about Dublin, async task started

# 18:06:28.652 [info] Asked about Dublin, sleeping...

# 18:06:29.652 [info] Asked about Dublin, done sleeping...

# 18:06:29.654 [error] Task #PID<0.150.0> started from #PID<0.147.0> terminating
# ** (RuntimeError) unknown Dublin
#     (elixir_delayed_response 0.1.0) lib/delayed_server.ex:24: DelayedServer.get_message/1
#     (elixir_delayed_response 0.1.0) lib/delayed_server.ex:19: DelayedServer.slow_call_then_reply/2
#     (elixir 1.15.4) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
# Function: #Function<0.47578622/0 in DelayedServer.handle_call/3>
#     Args: []
# ** (exit) exited in: GenServer.call(#PID<0.147.0>, {:get_data, "Dublin"}, :infinity)
#     ** (EXIT) an exception was raised:
#         ** (RuntimeError) unknown city Dublin
#             (elixir_delayed_response 0.1.0) lib/delayed_server.ex:24: DelayedServer.get_message/1
#             (elixir_delayed_response 0.1.0) lib/delayed_server.ex:19: DelayedServer.slow_call_then_reply/2
#             (elixir 1.15.4) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
#     (elixir 1.15.4) lib/gen_server.ex:1074: GenServer.call/3
#     iex:4: (file)
# iex(4)>

2 Likes

You can improve this by starting the Tasks under DynamicSupervisor, with restart: :temporary, and monitor the Task’s pid instead of linking to get notified when the process finishes or crashes (and what was the exit value/reason).

Further, if you need retries, you can use a different restart strategy on the supervisor and introduce an additional process, whose job is to start and monitor the Tasks, and provide responses in case these finally crash after N restarts and the job can’t be completed.

Then, if you want to limit the concurrency, you can introduce max_children for the DynamicSupervisor, and then queue up the calls somewhere (the new process from above paragraph can do it) if they’re coming in and the capacity is exceeded, and you just built yourself an in-memory queue system with a pool of N workers, retries and error handling.

7 Likes

Alternatively there’s Task.Supervisor implementing most of that already within the abstractions of Task.

7 Likes

Thank you, now I have just a couple of questions :slight_smile:

So in my example, the call chain is Caller -> DelayedServer -> Task. The Task is linked to DelayedServer, so if it dies, the DelayedServer dies, and Caller is unblocked.

If we introduce the DynamicSupervisor, will the call chain look like this:

Caller -> DelayedServer -> new DynamicSupervisor -> Task

or like this:

Caller -> DelayedServer -> AsyncWrapper -> DynamicSupervisor -> Task?

(here AsyncWrapper monitors the DynamicSupervisor/Task and will send a :reply to the Caller if the Task crashes)

I can’t figure out how to achieve the following idiomatically:

  • track the Task status and let the Caller know if it crashed
  • and still exit DelayedServer early with {:noreply, state}