Request for code review - a GenServer in need of a blocking, retryable side-effect

Hi everyone. I would appreciate any feedback on my usage of a GenServer below - and non-usage of the Task and other modules.

Main question

If a GenServer has to wait for the result of a Storage.insert call (below), is there anything better one can do than just call this function?

defmodule Storage do
  def insert(msg, tries \\ 5)
  def insert(_msg, 0) do
    {:error, "DB insert failed"}
  end
  def insert(msg, tries) do
    case do_insert(msg) do
      {:ok, response} ->
        {:ok, response}
      {:error, _error} ->
        insert(msg, tries - 1)
    end
  end
  def do_insert(msg), do: #...
end
defmodule Client do
  use GenServer
  # (Other functions omitted.)
  def handle_call({:process_kafka_msg, msg}, _from, state) do
    # Either 
    # A) insertion succeeds OR 
    # B) we shouldn't reply with `:ok` and thus commit the offset to Kafka
    Storage.insert(msg)

    # Implicitly commit a Kafka offset by returning `:ok`, 
    # which gets pattern-matched in the Kafka consumer module `MyKafkaConsumer` below.
    {:reply, :ok, state}
  end
end

Did I read Sasa Juric’s answer correctly, that this is best one can do? Also, is there any place for Process.sleep/1 in this approach, despite the “Use this function with extreme care” warning in its doc?

… If yes [the phoenix controller needs to wait for the result of the job], then I’d just run the job in the same process.

Overview

Clients’ messages are read from a Kafka broker, using kafka_ex. Each client has its GenServer process, per client_id in a message. A message has to be stored in a DB, or the Kafka offset (of this message) should not be commited.

Sample

A minimized sample, with MyRegistry, ClientSupervisor and some error handling omitted:

defmodule MyKafkaConsumer do
  use KafkaEx.GenConsumer
  def handle_message_set(message_set, state) do
    message_set
    |> Task.async_stream(
      &(&1.value
        |> Jason.decode!()
        |> process_msg()
        ), timeout: 300_000, ordered: false)
    |> Stream.run()
    {:async_commit, state}
  end
  def process_msg(%{"client_id" => client_id} = msg) do
    if Registry.lookup(MyRegistry, client_id) == [] do
      ClientSupervisor.start_client(client_id)
    end
    :ok = Client.process_kafka_msg(msg)
  end
end
defmodule Client do
  use GenServer
  def start_link(client_id) do
    GenServer.start_link(__MODULE__, client_id, name: via_tuple(client_id))
  end
  def via_tuple(client_id) do
    {:via, Registry, {MyRegistry, client_id}}
  end
  def process_kafka_msg(%{"client_id" => client_id} = msg) do
    GenServer.call(via_tuple(client_id), {:process_kafka_msg, msg})
  end
  def handle_call({:process_kafka_msg, msg}, _from, state) do
    # Either 
    # A) insertion succeeds OR 
    # B) we shouldn't reply with `:ok` and thus commit the offset to Kafka
    Storage.insert(msg)
    # Implicitly commit a Kafka offset by returning `:ok`
    {:reply, :ok, state}
  end
end
defmodule Storage do
  def insert(msg, tries \\ 5)
  def insert(_msg, 0) do
    {:error, "DB insert failed"}
  end
  def insert(msg, tries) do
    case do_insert(msg) do
      {:ok, response} ->
        {:ok, response}
      {:error, _error} ->
        insert(msg, tries - 1)
    end
  end
  def do_insert(msg), do: #...
end

Is it viable to preliminarily start each GenServer for every client_id or are they too many?

Hi Dimitar :wave:We expect around 50k client_ids which, afaiu, is still easily doable on the BEAM. The client_ids aren’t known upfront though.

Ah, so then Registry definitely makes sense. Just something I’ve noticed and figured will mention because I am not a fan of code like this:

    if Registry.lookup(MyRegistry, client_id) == [] do
      ClientSupervisor.start_client(client_id)
    end

Also, should this check be done on every message? Shouldn’t it be done only once, preferably during initialization or just in the handle_message_set function?

Agreed, not a fan myself either. Any improvement suggestion appreciated. For one, the intent isn’t as clear as it could be - obviously, a function wrapper could clarify the intent a bit. The idiom is common enough that one just gets used to seeing the == [] comparison.

A client can be active for a day, but then become inactive for a week, say. So, currently we optionally start its GenServer if a msg arrives with its client_id. A client’s GenServer then terminates after few hours of inactivity. Would love to consider possible improvements on this front as well though, so that the above Registry.lookup check wouldn’t be needed on every msg, as you pointed out.

Looking at your code, it’s not at all clear why do you need one GenServer per client_id. Seems like a parameter that can be passed to a more generic module. Apropo, why do you need a GenServer here in the first place? Doesn’t look like it’s some kind of a background worker, it seems like a candidate for a normal module with a function that does stuff that doesn’t seem tightly bound to any of its parameters.

To also address your original question: you can add a parameter specifying whether to wait on Storage.insert or not via the standard Elixir practice of having a keyword list as a last parameter, e.g. Storage.insert(msg, sync: true).

1 Like

Noted. I’ll reconsider if GenServer is actually necessary, given the current use case - as is, I suspect not. Unless a business logic will be added that would need to compare a current-msg with previous-msg(s): e.g. whether msgs, coming from a flaky GPS hardware, are being delayed/duplicated/etc.

Aha, a [sync: true] flag would def ease the potential switch, if async behaviour became desirable or sufficient - and then, according to this flag, use (or not) a e.g. Task.Supervisor.async. In conclusion, i take this that there’s no better implementation of a blocking/sync call than the above (other than the likely overuse of a GenServer).

Thanks. That was very helpful.

PS. Would it be unperformant if one would just call ClientSupervisor.start_client(client_id) immediately, without defensively checking if Registry.lookup(MyRegistry, client_id) == []? Two lines of code less to read and just ignoring {:error, {:already_started, _}} msgs probably/hopefully isn’t problematic?

It likely will perform worse but I am not sure it will even register unless you have to ingest 10k+ messages per second.

As you alluded to earlier, it’s best if you have a function e.g. ensure_genserver_client_started(client_id). That will at least remove the inlined piece of code and make the intention of the encompassing function (process_msg) crystal clear.

Additionally, removing all the GenServer boilerplate and just converting your code to its bare minimum – a module with a few functions – will IMO help you crystallize the idea behind it so even if one day you have to add GenServer on top again, I’d preserve this separate module and make the GenServer a thin wrapper around it (living in a separate file).

That last part seems to be a huge point of contention in Elixir world, mind you. I side with the camp that believes boilerplate and glue code have to exist separately from the code they’re gluing / providing an interface/API to.

1 Like