Hello,
I’m trying to manually commit offsets with KafkaEx. The following is present:
- A topic:
a-topic-which-has-a-lot-of-messages
- A producer which produces messages (Protobuf encoded messages)
- Following sample code:
Application.ex
# application.ex - ignoring boiler plate code
consumer_opts = [fetch_options: [max_bytes: 2_000_000]]
supervisor(
KafkaEx.ConsumerGroup,
[MyOwnFancyConsumer, "worker_consumer_group", ["a-topic-which-has-a-lot-of-messages"], consumer_group_opts]
),
Fun fact: in KafkaEx
the default request max_bytes
setting is 1_000_000 if i’m not mistaken. Since the default kafka topic batch message size was bigger than KafkaEx according to the docs, it failed with a rather cryptic error. Hence I’m setting the client max_bytes a lot higher. It would be nice if this would be the same as the Kafka default settings, but not complaining about small stuff here
Next up the MyOwnFancyConsumer module:
defmodule MyOwnFancyConsumer do
use KafkaEx.GenConsumer
alias KafkaEx.Protocol.Fetch
alias KafkaEx.Protocol.Produce
def handle_message_set(msg_set, state) do
msg_set
|> Enum.map(fn %Fetch.Message{value: message} = original_message ->
{MyCompiledProtobufModuleMessage.decode!(message), original_message}
end)
|> Enum.each(fn {%MyCompiledProtobufModuleMessage{} = todo_task_message, original_message} ->
# do some work
my_offset_commit_request = %KafkaEx.Protocol.OffsetCommit.Request{
offset: original_message.offset,
partition: original_message.partition,
topic: "a-topic-which-has-a-lot-of-messages"
}
# It crashes with the next line. Error log incoming after this code block
tryresult = KafkaEx.offset_commit(self(), my_offset_commit_request)
end)
# I don't need to do this I guess... was wondering, could i just return a {:noreply, state} as with a GenServer?
{:sync_commit, state}
end
And at last the error (put it simply: genserver tries to call itself):
10:52:08.542 [error] GenServer #PID<0.261.0> terminating
** (stop) exited in: GenServer.call(#PID<0.261.0>, {:offset_commit, %KafkaEx.Protocol.OffsetCommit.Request{api_version: 0, consumer_group: nil, generation_id: -1, member_id: "kafkaex", metadata: "", offset: 0, partition: 0, timestamp: 0, topic: "a-topic-which-has-a-lot-of-messages"}}, 7000)
** (EXIT) process attempted to call itself
...
Last message: :timeout
State: %KafkaEx.GenConsumer.State{acked_offset: 0, api_versions: %{fetch: 0, offset_commit: 0, offset_fetch: 0}, auto_offset_reset: :none, commit_interval: 5000, commit_threshold: 100, committed_offset: 0, consumer_module: MyOwnFancyConsumer, consumer_state: nil, current_offset: 0, fetch_options: [auto_commit: false, worker_name: #PID<0.262.0>], generation_id: 3, group: "worker_consumer_group", last_commit: -576460722500, member_id: "kafka_ex-79e419a3-9991-4dc0-8102-6c0115f281e6", partition: 0, topic: "a-topic-which-has-a-lot-of-messages", worker_name: #PID<0.262.0>}
What works:
- Process all the messages in batch and sync commit, so far for stream processing though… Don’t like this solution.
- don’t manually commit and sync commit / async commit after batch
- Adjusting kafka settings to put the max_bytes / message quite low, but don’t like this as well.
What doesn’t work (and why I’m writing this post) is to manually commit offsets. Couldn’t immediately find sample code / detailed documentation regarding this. Function isn’t documented at the docs as well. Is my way of thinking wrong to let the consumer manually commit its offset?
docker compose i'm using:
version: "3.3"
services:
zookeeper_1:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper_1
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*"
kafka_1:
image: confluentinc/cp-kafka:latest
container_name: kafka_1
depends_on:
- zookeeper_1
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper_1:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- "9092:9092"
Elixir and erlang version:
Erlang/OTP 23 [erts-11.0.2] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe]
IEx 1.10.4 (compiled with Erlang/OTP 23)
kafka_ex version: 0.11
Thank you in advance.