Klife - Introducing Kafka Consumer Groups, API preview & design discussion (RFC)

Hi everyone!

Some time ago, I released the first version of Klife (Klife - A Kafka client with performance gains over 10x), a Kafka client written from scratch in Elixir. At that time, it only supported producing messages. Since then, I’ve been working on implementing consumer group support, and as it nears completion, I’d love to gather feedback from the community — especially around the design of the API and any Kafka-related pain points you’ve experienced, even if they’re not directly tied to Klife’s interface.

Why This Post?

This post is a starting point for collaboration — a space to discuss Kafka consumer pain points and how Klife might help. I’m sharing the proposed API and internals to collect early feedback, but also to invite anyone who’s wrestled with Kafka on the BEAM to weigh in — whether it’s design suggestions, performance headaches or any small detail that’s made Kafka harder than it should be. No detail is too small.

Basic Usage Example

Here’s a simple example of how a consumer group is defined in Klife:

defmodule MyConsumerGroup do
  use Klife.Consumer.ConsumerGroup,
    client: MyClient,
    group_name: "my_group_name",
    topics: [
      [name: "my_consumer_topic"],
      [name: "my_consumer_topic_2"]
    ]

  @impl true
  def handle_record_batch(topic, partition, record_lists) do
    # Do some processing here!
  end
end

You define a module that implements the consumer group behaviour and pass configuration either via use (compile-time) or at start_link (runtime) depending on your needs. Then start it on your supervision tree.

Consumer Group Behaviour

@type action ::
        :commit | {:skip, String.t()} | {:move_to, String.t()} | :retry

@type callback_opts :: [
        {:handler_cooldown_ms, non_neg_integer()}
      ]

@callback handle_record_batch(topic :: String.t(), partition :: integer, list(Klife.Record.t())) ::
            action
            | {action, callback_opts}
            | list({action, Klife.Record.t()})
            | {list({action, Klife.Record.t()}), callback_opts}

@callback handle_consumer_start(topic :: String.t(), partition :: integer) :: :ok
@callback handle_consumer_stop(topic :: String.t(), partition :: integer, reason :: term) :: :ok

@optional_callbacks [handle_consumer_start: 2, handle_consumer_stop: 3]

The main callback is handle_record_batch/3, where the actual record processing happens. The records are guaranteed to be ordered, and the return value tells Klife what to do with each one:

  • :commit - everything went fine, commit the record

  • {:skip, reason} - commit the record but note the reason for skipping it on commit metadata

  • {:move_to, topic} - commit the record and send it to another topic (for retries or DLQs), also note it on commit metadata

  • :retry - do NOT commit; requeue the record on the internal queue for another processing cycle with bumped attempt count

Response example:

[
  {:commit, rec1},
  {{:skip, "validation failed"}, rec2},
  {{:move_to, "my_dlq_topic"}, rec3},
  {:commit, rec4},
  {:retry, rec5}
]

If you return a single action (:commit, {:skip, reason}, etc), it will be applied to all records in the batch (i.e., it’s shorthand for the full list).

There are some edge cases to handle, such as:

  • What should happen if the user returns a list shorter than the number of records?

  • What if the list contains an invalid order (e.g., retrying a record before committing one with a higher offset)?

The current plan is to raise on these cases to avoid inconsistent consumer group state.

You can also return callback_opts to control things like cooldown timing (more on that below).

Consumer Group Config

A consumer group is responsible for maintaining heartbeat, reacting to rebalances, and managing consumer lifecycles. Here’s a summary of its configuration:

[
  client: [
    type: :atom,
    required: true,
    doc: "The name of the klife client to be used by the consumer group"
  ],
  topics: [
    type: {:list, {:keyword_list, TopicConfig.get_opts()}},
    required: true,
    doc: "List of topic configurations that will be handled by the consumer group"
  ],
  group_name: [
    type: :string,
    required: true,
    doc: "Name of the consumer group"
  ],
  instance_id: [
    type: :string,
    doc: "Value to identify the consumer across restarts (static membership). See KIP-345"
  ],
  rebalance_timeout_ms: [
    type: :non_neg_integer,
    default: 30_000,
    doc:
      "The maximum time in milliseconds that the kafka broker coordinator will wait on the member to revoke it's partitions"
  ],
  fetcher_name: [
    type: :atom,
    doc:
      "Fetcher name to be used by the consumers of the group. Defaults to client's default fetcher"
  ],
  committers_count: [
    type: :pos_integer,
    default: 1,
    doc: "How many committer processes will be started for the consumer group"
  ],
  isolation_level: [
    type: {:in, [:read_committed, :read_uncommitted]},
    default: :read_committed,
    doc:
      "Define if the consumers of the consumer group will receive uncommitted transactional records"
  ]
]

A few Klife-specific highlights:

  • :fetcher_name - lets you group fetches to brokers under a named fetcher, enabling custom batching behavior. Similar to the current :producer_name options on the producer feature.

  • :committers_count - since commit requests can not be grouped across groups, each consumer group has it’s specific commiter process which may be a bottleneck when consuming many partitions, with this option you can share the load among more committer processes

Per-Topic Consumer Options

Each consumer group manages a set of consumers—one per assigned partition. Each consumer runs its own processing loop, backed by an internal queue, and performs asynchronous fetch and commit operations to maximize throughput.

While some configuration is inherited from the consumer group, most settings are defined in the TopicConfig, which is passed via the topics option in the consumer group. These options include:

[
  name: [
    type: :string,
    required: true,
    doc: "Name of the topic the consumer group will subscribe to"
  ],
  fetcher_name: [
    type: {:or, [:atom, :string]},
    doc:
      "Fetcher name to be used by the consumers of this topic. Overrides the one defined on the consumer group."
  ],
  isolation_level: [
    type: {:in, [:read_committed, :read_uncommitted]},
    doc: "May override the isolation level defined on the consumer group"
  ],
  offset_reset_policy: [
    type: {:in, [:latest, :earliest, :error]},
    default: :latest,
    doc:
      "Define from which offset the consumer will start processing records when no previous committed offset is found."
  ],
  fetch_max_bytes: [
    type: :non_neg_integer,
    default: 50_000,
    doc:
      "The maximum amount of bytes to fetch in a single request. Must be lower than fetcher config `max_bytes_per_request`"
  ],
  fetch_interval_ms: [
    type: :non_neg_integer,
    default: 5000,
    doc: """
    Time in milliseconds that the consumer will wait before trying to fetch new data from the broker after it runs out of records to process.

    The consumer always tries to optimize fetch requests wait times by issuing requests before it's internal queue is empty. Therefore
    this option is only used for the wait time after a fetch request returns empty.

    TODO: Add backoff description
    """
  ],
  handler_cooldown_ms: [
    type: :non_neg_integer,
    default: 0,
    doc: """
    Time in milliseconds that the consumer will wait before handling new records. Can be overrided for one cycle by the handler return value.
    """
  ],
  handler_max_commits_in_flight: [
    type: :non_neg_integer,
    default: 0,
    doc: """
    Controls how many commit messages can be waiting for confirmation before the consumer stops processing new records.

    When this limit is reached, processing pauses until confirmations are received. Set to 0 to process records one batch at a time - each batch must be fully confirmed before starting the next.
    """
  ],
  handler_max_batch_size: [
    type: :pos_integer,
    default: 10,
    doc:
      "The maximum amount of records that will be delivered to the handler in each processing cycle."
  ]
]

Notable options:

  • :fetch_max_bytes - Controls the size of each fetch request (not the full queue size). Actual memory use may exceed this due to async fetch prefetching.

  • :fetch_interval_ms - This setting only applies when a fetch request returns no records. On busy topics, the consumer fetches on demand as soon as the internal queue drops below a threshold. But if a fetch returns empty, the consumer enters a progressive linear backoff, gradually increasing the wait time until it reaches :fetch_interval_ms, after which it waits that full interval between retries until new data becomes available.

  • :handler_cooldown_ms - Adds a post-commit cooldown between batches, helping throttle consumption without blocking useful work. Aimed to address issues like the ones reported on the original post when .

  • :handler_max_commits_in_flight - Allows processing new batches while waiting for previous commits to complete. A performance vs consistency tradeoff — useful when strict ordering isn’t needed. I’m also planning to add an ETS-based temporary offset store (optionally replicated cluster-wide) to reduce the risk of duplicate processing on crashes.

Caveats

  • KIP-848 Only: The current implementation is built around the new rebalance protocol introduced in KIP-848, which became general available on Kafka 4.0. This gives us access to modern consumer features and address ont of the biggest pain points afaik (costly rebalances), but may limit compatibility with older clusters.

  • Still a Work in Progress: The commit logic is still being finalized, but partition assignment, rebalancing, and record handling are already working well. You can check out the current implementation here: GitHub - oliveigah/klife: Kafka client for elixir

Wrap-up

Thank you for reading this far!

Again, if you’ve struggled with Kafka in Elixir before, I’d love to hear from you — whether it’s feedback on the proposed interface, missing features in existing clients, or design tradeoffs you’d like to see better addressed. Even non-technical frustrations are valuable at this stage!

Let’s use this thread as an open forum for discussing Elixir + Kafka. Your insights will directly help shape Klife’s future direction.

Thanks in advance! :purple_heart:

8 Likes

cc: @jakemorrison @mindreader @mekusigjinn

You mentioned some Kafka pain points in the original announcement post, would love to hear your thoughts here as well. Thanks!

It looks like you’ve done a lot of good work. I’ve been mostly trying to keep kafka out of my companies because it tends to be the wiring between services and can actually cause the company to stop using elixir in favor of mainstream languages.

That said it looks like you’ve put a lot of thought into it, and if I can no longer avoid it I will definitely take it for a spin. I know that writing fully feature kafka client is a rough job (due to the way kafka works).

One thing that helps is adding telemetry. Seeing information about telemetry in for example README — gnat v1.10.0 give me some assurance that the author is thinking about production use.

I like the async produce function. Hopefully you can send a pid along so that upon receipt, it can send a message back to the original process.

I worry that using the phash2 function for partitioning is a problem. Kafka has a default partition algorithm and if your algorithm by default doesn’t match how the official bindings hash, it will select a different place to send messages to than the official bindings would, which believe it or not, could cause issues. Not something you would notice immediately, but I’ve written applications where it would matter (production from different languages consistently into partitions).

I dig the testing stuff. That was always lacking when I used to use it. Running an instance of kafka for testing or dev work is absolutely the worst.

Actually one thing I ended up having to do, is in order to simulate kafka without actually running kafka in dev was to basically just write a genserver that would run on all nodes and distribute messages the way they would be distributed in actual kafka. If that came in your package it would be a life saver, and its pretty easy to do in elixir.

Good luck.

3 Likes

Thank you so much for your feedback! (:

Yes! I will add some telemetry before the first oficial 1.0 for sure! I’m not sure exactly about which ones yet, but I will work on it!

This is straightforward to support in produce_async/2, but not in produce_batch_async/2. By design, each record in a batch results in a separate message sent to the target PID. Ideally, we would deliver the entire batch as a single message, but that would require an intermediary process to collect and forward the full list.

To keep the interfaces consistent for now, this behavior is not currently supported. But I would like to support if I can find a good way to do it.

Great catch! I had not considered how using a different default partitioning strategy could cause issues in multi-clients environments. I will update the default in version 1.0, thanks for pointing it out!

That said, it’s still easy to support custom behavior by plugging in a custom partitioner.

Yes! Right now, tests require a local Kafka instance and support assertions on produced records. Some folks have mentioned they are not too comfortable using it in development (personally, I do not think it is that bad, though I might just be too deep into it haha). That said, I definitely plan to add support for testing without a local Kafka. Just need to think through the best way to implement it.