Hi everyone!
Some time ago, I released the first version of Klife (Klife - A Kafka client with performance gains over 10x), a Kafka client written from scratch in Elixir. At that time, it only supported producing messages. Since then, I’ve been working on implementing consumer group support, and as it nears completion, I’d love to gather feedback from the community — especially around the design of the API and any Kafka-related pain points you’ve experienced, even if they’re not directly tied to Klife’s interface.
Why This Post?
This post is a starting point for collaboration — a space to discuss Kafka consumer pain points and how Klife might help. I’m sharing the proposed API and internals to collect early feedback, but also to invite anyone who’s wrestled with Kafka on the BEAM to weigh in — whether it’s design suggestions, performance headaches or any small detail that’s made Kafka harder than it should be. No detail is too small.
Basic Usage Example
Here’s a simple example of how a consumer group is defined in Klife:
defmodule MyConsumerGroup do
use Klife.Consumer.ConsumerGroup,
client: MyClient,
group_name: "my_group_name",
topics: [
[name: "my_consumer_topic"],
[name: "my_consumer_topic_2"]
]
@impl true
def handle_record_batch(topic, partition, record_lists) do
# Do some processing here!
end
end
You define a module that implements the consumer group behaviour and pass configuration either via use (compile-time) or at start_link (runtime) depending on your needs. Then start it on your supervision tree.
Consumer Group Behaviour
@type action ::
:commit | {:skip, String.t()} | {:move_to, String.t()} | :retry
@type callback_opts :: [
{:handler_cooldown_ms, non_neg_integer()}
]
@callback handle_record_batch(topic :: String.t(), partition :: integer, list(Klife.Record.t())) ::
action
| {action, callback_opts}
| list({action, Klife.Record.t()})
| {list({action, Klife.Record.t()}), callback_opts}
@callback handle_consumer_start(topic :: String.t(), partition :: integer) :: :ok
@callback handle_consumer_stop(topic :: String.t(), partition :: integer, reason :: term) :: :ok
@optional_callbacks [handle_consumer_start: 2, handle_consumer_stop: 3]
The main callback is handle_record_batch/3
, where the actual record processing happens. The records are guaranteed to be ordered, and the return value tells Klife what to do with each one:
-
:commit
- everything went fine, commit the record -
{:skip, reason}
- commit the record but note the reason for skipping it on commit metadata -
{:move_to, topic}
- commit the record and send it to another topic (for retries or DLQs), also note it on commit metadata -
:retry
- do NOT commit; requeue the record on the internal queue for another processing cycle with bumped attempt count
Response example:
[
{:commit, rec1},
{{:skip, "validation failed"}, rec2},
{{:move_to, "my_dlq_topic"}, rec3},
{:commit, rec4},
{:retry, rec5}
]
If you return a single action (:commit
, {:skip, reason}
, etc), it will be applied to all records in the batch (i.e., it’s shorthand for the full list).
There are some edge cases to handle, such as:
-
What should happen if the user returns a list shorter than the number of records?
-
What if the list contains an invalid order (e.g., retrying a record before committing one with a higher offset)?
The current plan is to raise on these cases to avoid inconsistent consumer group state.
You can also return callback_opts
to control things like cooldown timing (more on that below).
Consumer Group Config
A consumer group is responsible for maintaining heartbeat, reacting to rebalances, and managing consumer lifecycles. Here’s a summary of its configuration:
[
client: [
type: :atom,
required: true,
doc: "The name of the klife client to be used by the consumer group"
],
topics: [
type: {:list, {:keyword_list, TopicConfig.get_opts()}},
required: true,
doc: "List of topic configurations that will be handled by the consumer group"
],
group_name: [
type: :string,
required: true,
doc: "Name of the consumer group"
],
instance_id: [
type: :string,
doc: "Value to identify the consumer across restarts (static membership). See KIP-345"
],
rebalance_timeout_ms: [
type: :non_neg_integer,
default: 30_000,
doc:
"The maximum time in milliseconds that the kafka broker coordinator will wait on the member to revoke it's partitions"
],
fetcher_name: [
type: :atom,
doc:
"Fetcher name to be used by the consumers of the group. Defaults to client's default fetcher"
],
committers_count: [
type: :pos_integer,
default: 1,
doc: "How many committer processes will be started for the consumer group"
],
isolation_level: [
type: {:in, [:read_committed, :read_uncommitted]},
default: :read_committed,
doc:
"Define if the consumers of the consumer group will receive uncommitted transactional records"
]
]
A few Klife-specific highlights:
-
:fetcher_name
- lets you group fetches to brokers under a named fetcher, enabling custom batching behavior. Similar to the current:producer_name
options on the producer feature. -
:committers_count
- since commit requests can not be grouped across groups, each consumer group has it’s specific commiter process which may be a bottleneck when consuming many partitions, with this option you can share the load among more committer processes
Per-Topic Consumer Options
Each consumer group manages a set of consumers—one per assigned partition. Each consumer runs its own processing loop, backed by an internal queue, and performs asynchronous fetch and commit operations to maximize throughput.
While some configuration is inherited from the consumer group, most settings are defined in the TopicConfig
, which is passed via the topics
option in the consumer group. These options include:
[
name: [
type: :string,
required: true,
doc: "Name of the topic the consumer group will subscribe to"
],
fetcher_name: [
type: {:or, [:atom, :string]},
doc:
"Fetcher name to be used by the consumers of this topic. Overrides the one defined on the consumer group."
],
isolation_level: [
type: {:in, [:read_committed, :read_uncommitted]},
doc: "May override the isolation level defined on the consumer group"
],
offset_reset_policy: [
type: {:in, [:latest, :earliest, :error]},
default: :latest,
doc:
"Define from which offset the consumer will start processing records when no previous committed offset is found."
],
fetch_max_bytes: [
type: :non_neg_integer,
default: 50_000,
doc:
"The maximum amount of bytes to fetch in a single request. Must be lower than fetcher config `max_bytes_per_request`"
],
fetch_interval_ms: [
type: :non_neg_integer,
default: 5000,
doc: """
Time in milliseconds that the consumer will wait before trying to fetch new data from the broker after it runs out of records to process.
The consumer always tries to optimize fetch requests wait times by issuing requests before it's internal queue is empty. Therefore
this option is only used for the wait time after a fetch request returns empty.
TODO: Add backoff description
"""
],
handler_cooldown_ms: [
type: :non_neg_integer,
default: 0,
doc: """
Time in milliseconds that the consumer will wait before handling new records. Can be overrided for one cycle by the handler return value.
"""
],
handler_max_commits_in_flight: [
type: :non_neg_integer,
default: 0,
doc: """
Controls how many commit messages can be waiting for confirmation before the consumer stops processing new records.
When this limit is reached, processing pauses until confirmations are received. Set to 0 to process records one batch at a time - each batch must be fully confirmed before starting the next.
"""
],
handler_max_batch_size: [
type: :pos_integer,
default: 10,
doc:
"The maximum amount of records that will be delivered to the handler in each processing cycle."
]
]
Notable options:
-
:fetch_max_bytes
- Controls the size of each fetch request (not the full queue size). Actual memory use may exceed this due to async fetch prefetching. -
:fetch_interval_ms
- This setting only applies when a fetch request returns no records. On busy topics, the consumer fetches on demand as soon as the internal queue drops below a threshold. But if a fetch returns empty, the consumer enters a progressive linear backoff, gradually increasing the wait time until it reaches:fetch_interval_ms
, after which it waits that full interval between retries until new data becomes available. -
:handler_cooldown_ms
- Adds a post-commit cooldown between batches, helping throttle consumption without blocking useful work. Aimed to address issues like the ones reported on the original post when . -
:handler_max_commits_in_flight
- Allows processing new batches while waiting for previous commits to complete. A performance vs consistency tradeoff — useful when strict ordering isn’t needed. I’m also planning to add an ETS-based temporary offset store (optionally replicated cluster-wide) to reduce the risk of duplicate processing on crashes.
Caveats
-
KIP-848 Only: The current implementation is built around the new rebalance protocol introduced in KIP-848, which became general available on Kafka 4.0. This gives us access to modern consumer features and address ont of the biggest pain points afaik (costly rebalances), but may limit compatibility with older clusters.
-
Still a Work in Progress: The commit logic is still being finalized, but partition assignment, rebalancing, and record handling are already working well. You can check out the current implementation here: GitHub - oliveigah/klife: Kafka client for elixir
Wrap-up
Thank you for reading this far!
Again, if you’ve struggled with Kafka in Elixir before, I’d love to hear from you — whether it’s feedback on the proposed interface, missing features in existing clients, or design tradeoffs you’d like to see better addressed. Even non-technical frustrations are valuable at this stage!
Let’s use this thread as an open forum for discussing Elixir + Kafka. Your insights will directly help shape Klife’s future direction.
Thanks in advance!