[Question] KafkaEx - manually committing offsets

Hello,

I’m trying to manually commit offsets with KafkaEx. The following is present:

  • A topic: a-topic-which-has-a-lot-of-messages
  • A producer which produces messages (Protobuf encoded messages)
  • Following sample code:
Application.ex
# application.ex - ignoring boiler plate code
      consumer_opts = [fetch_options: [max_bytes: 2_000_000]]
      supervisor(
        KafkaEx.ConsumerGroup,
        [MyOwnFancyConsumer, "worker_consumer_group", ["a-topic-which-has-a-lot-of-messages"], consumer_group_opts]
      ),

Fun fact: in KafkaEx the default request max_bytes setting is 1_000_000 if i’m not mistaken. Since the default kafka topic batch message size was bigger than KafkaEx according to the docs, it failed with a rather cryptic error. Hence I’m setting the client max_bytes a lot higher. It would be nice if this would be the same as the Kafka default settings, but not complaining about small stuff here :wink:

Next up the MyOwnFancyConsumer module:
defmodule MyOwnFancyConsumer do
  use KafkaEx.GenConsumer

  alias KafkaEx.Protocol.Fetch
  alias KafkaEx.Protocol.Produce

  def handle_message_set(msg_set, state) do
    msg_set
    |> Enum.map(fn %Fetch.Message{value: message} = original_message ->
      {MyCompiledProtobufModuleMessage.decode!(message), original_message}
    end)
    |> Enum.each(fn {%MyCompiledProtobufModuleMessage{} = todo_task_message, original_message} ->
      # do some work

      my_offset_commit_request = %KafkaEx.Protocol.OffsetCommit.Request{
        offset: original_message.offset,
        partition: original_message.partition,
        topic: "a-topic-which-has-a-lot-of-messages"
      }

      # It crashes with the next line. Error log incoming after this code block
      tryresult = KafkaEx.offset_commit(self(), my_offset_commit_request)
    end)

    # I don't need to do this I guess... was wondering, could i just return a {:noreply, state} as with a GenServer?
    {:sync_commit, state}
  end
And at last the error (put it simply: genserver tries to call itself):
10:52:08.542 [error] GenServer #PID<0.261.0> terminating
** (stop) exited in: GenServer.call(#PID<0.261.0>, {:offset_commit, %KafkaEx.Protocol.OffsetCommit.Request{api_version: 0, consumer_group: nil, generation_id: -1, member_id: "kafkaex", metadata: "", offset: 0, partition: 0, timestamp: 0, topic: "a-topic-which-has-a-lot-of-messages"}}, 7000)
    ** (EXIT) process attempted to call itself
    ...
Last message: :timeout
State: %KafkaEx.GenConsumer.State{acked_offset: 0, api_versions: %{fetch: 0, offset_commit: 0, offset_fetch: 0}, auto_offset_reset: :none, commit_interval: 5000, commit_threshold: 100, committed_offset: 0, consumer_module: MyOwnFancyConsumer, consumer_state: nil, current_offset: 0, fetch_options: [auto_commit: false, worker_name: #PID<0.262.0>], generation_id: 3, group: "worker_consumer_group", last_commit: -576460722500, member_id: "kafka_ex-79e419a3-9991-4dc0-8102-6c0115f281e6", partition: 0, topic: "a-topic-which-has-a-lot-of-messages", worker_name: #PID<0.262.0>}

What works:

  • Process all the messages in batch and sync commit, so far for stream processing though… Don’t like this solution.
  • don’t manually commit and sync commit / async commit after batch
  • Adjusting kafka settings to put the max_bytes / message quite low, but don’t like this as well.

What doesn’t work (and why I’m writing this post) is to manually commit offsets. Couldn’t immediately find sample code / detailed documentation regarding this. Function isn’t documented at the docs as well. Is my way of thinking wrong to let the consumer manually commit its offset?

docker compose i'm using:
version: "3.3"

services:
  zookeeper_1:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper_1
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*"

  kafka_1:
    image: confluentinc/cp-kafka:latest
    container_name: kafka_1
    depends_on:
      - zookeeper_1
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper_1:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - "9092:9092"

Elixir and erlang version:

Erlang/OTP 23 [erts-11.0.2] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe]

IEx 1.10.4 (compiled with Erlang/OTP 23)

kafka_ex version: 0.11

Thank you in advance.

We use brod for our kafka consumers at work, which allows you to manually commit offsets like this. Unfortunately, I don’t know much about kafka_ex as we stopped using it a while ago.

I do want to challenge this approach a bit though. Committing offsets like this will tank your throughput. You’re much better off marking the offsets as completed and having a process sync those with kafka on an interval (this may be how kafka_ex works under the hood fwiw). This also implies that all of your messages need to be idempotent, but, if you’re doing kafka stuff that should be a given. If your messages aren’t idempotent, you’re fighting a battle you’ve already lost.

Hopefully that’s helpful.

In regards to this statement:

This also implies that all of your messages need to be idempotent

I am assuming you mean that the process of consuming a message is idempotent. If that is the case, are there any general tips you have, or common pitfalls you have encountered when it comes to idempotent consumption of messages. I realize this is largely context/application specific, but thought I would ask anyways.

Right, your message processing should all be idempotent.

The way we achieve this is by tagging each message with a globally unique id. We use https://github.com/keathley/hlcid for this but you could also use a flake id, uuidv4, etc. We assign these ids as early as we can. Each consumer can then use those ids to enforce its delivery semantics: at most once or at least once. For most operations we choose to use at least once semantics + idempotence to achieve exactly once delivery semantics. For scenarios where you guarantee idempotence (most commonly sending an RPC to a service you don’t control and can’t support at least once delivery) we have to use At Most Once delivery.

In order to achieve either of these schemes, you’ll need a way to store which ids you’ve already processed. If you need at most once processing, you can check to see if the id has already been stored and if it hasn’t you immediately store it, then proceed to do whatever action you need to take. If you end up replaying that message, either due to a crash, error, restart, something upstream re-published the message or some other transient failure, you’ll end up skipping the operation, even if the operation had previously failed.

If you want At Least Once processing, you check to see if the id is stored, do your work, and then store the id. This assumes that the operations that you’re conducting are also idempotent. For instance, incrementing a counter isn’t an idempotent operation. So, instead you’ll need to use something like a Set and add your id to the set. Then when you want the count you can take the cardinality of the Set.

I gave a talk on this a while ago: https://keathley.io/talks/kafka_the_hard_parts.html which might be useful. After working on these types of systems over the past few years, my opinion is that stream processing systems are incredibly fragile. We’ve spent a lot of time building internal libraries that support this stuff but unfortunately I haven’t been able to get them open sourced yet. IMO the ecosystem is really lacking a solid answer around kafka specifically. I think Elixir and Erlang are really well suited for data pipeline problems, but the tooling around kafka and other ingestion isn’t really there yet. If you need to do this for production you should be ready to support a lot of your own tooling. Otherwise you should probably just use kstreams, flink, wallaroo, storm, etc.

3 Likes