When I produce messages and fetch them with
KafkaEx.fetch(topic, 0, offset: 0)
I can see the messages produced.
However, it looks like my consumer is never called:
defmodule Kafka.Consumer do
@moduledoc false
use KafkaEx.GenConsumer
alias KafkaEx.Protocol.Fetch.Message
require Logger
def handle_message_set(message_set, state) do
IO.inspect("Why do I never get here")
for %Message{value: message} <- message_set do
Logger.debug(fn -> "message: " <> inspect(message) end)
end
{:async_commit, state}
end
end
it’s get started like so:
def start_child() do
consumer_group_opts = [
# setting for the ConsumerGroup
heartbeat_interval: 1_000,
# this setting will be forwarded to the GenConsumer
commit_interval: 1_000
]
consumer_group_name = Application.get_env(:kafka_ex, :consumer_group)
child_spec = %{
id: KafkaEx.ConsumerGroup,
start: {
KafkaEx.ConsumerGroup,
:start_link,
[Consumer, consumer_group_name, [@topic_consumer], consumer_group_opts]
}
}
DynamicSupervisor.start_child(__MODULE__, child_spec)
end
What could be the reason it never gets into handle_message_set?
Thanks!