Hi everyone. I would appreciate any feedback on my usage of a GenServer
below - and non-usage of the Task
and other modules.
Main question
If a GenServer has to wait for the result of a Storage.insert
call (below), is there anything better one can do than just call this function?
defmodule Storage do
def insert(msg, tries \\ 5)
def insert(_msg, 0) do
{:error, "DB insert failed"}
end
def insert(msg, tries) do
case do_insert(msg) do
{:ok, response} ->
{:ok, response}
{:error, _error} ->
insert(msg, tries - 1)
end
end
def do_insert(msg), do: #...
end
defmodule Client do
use GenServer
# (Other functions omitted.)
def handle_call({:process_kafka_msg, msg}, _from, state) do
# Either
# A) insertion succeeds OR
# B) we shouldn't reply with `:ok` and thus commit the offset to Kafka
Storage.insert(msg)
# Implicitly commit a Kafka offset by returning `:ok`,
# which gets pattern-matched in the Kafka consumer module `MyKafkaConsumer` below.
{:reply, :ok, state}
end
end
Did I read Sasa Juric’s answer correctly, that this is best one can do? Also, is there any place for Process.sleep/1
in this approach, despite the “Use this function with extreme care” warning in its doc?
… If yes [the phoenix controller needs to wait for the result of the job], then I’d just run the job in the same process.
Overview
Clients’ messages are read from a Kafka broker, using kafka_ex. Each client has its GenServer process, per client_id
in a message. A message has to be stored in a DB, or the Kafka offset (of this message) should not be commited.
Sample
A minimized sample, with MyRegistry
, ClientSupervisor
and some error handling omitted:
defmodule MyKafkaConsumer do
use KafkaEx.GenConsumer
def handle_message_set(message_set, state) do
message_set
|> Task.async_stream(
&(&1.value
|> Jason.decode!()
|> process_msg()
), timeout: 300_000, ordered: false)
|> Stream.run()
{:async_commit, state}
end
def process_msg(%{"client_id" => client_id} = msg) do
if Registry.lookup(MyRegistry, client_id) == [] do
ClientSupervisor.start_client(client_id)
end
:ok = Client.process_kafka_msg(msg)
end
end
defmodule Client do
use GenServer
def start_link(client_id) do
GenServer.start_link(__MODULE__, client_id, name: via_tuple(client_id))
end
def via_tuple(client_id) do
{:via, Registry, {MyRegistry, client_id}}
end
def process_kafka_msg(%{"client_id" => client_id} = msg) do
GenServer.call(via_tuple(client_id), {:process_kafka_msg, msg})
end
def handle_call({:process_kafka_msg, msg}, _from, state) do
# Either
# A) insertion succeeds OR
# B) we shouldn't reply with `:ok` and thus commit the offset to Kafka
Storage.insert(msg)
# Implicitly commit a Kafka offset by returning `:ok`
{:reply, :ok, state}
end
end
defmodule Storage do
def insert(msg, tries \\ 5)
def insert(_msg, 0) do
{:error, "DB insert failed"}
end
def insert(msg, tries) do
case do_insert(msg) do
{:ok, response} ->
{:ok, response}
{:error, _error} ->
insert(msg, tries - 1)
end
end
def do_insert(msg), do: #...
end