KafkaEx: handle_message_set is not called

When I produce messages and fetch them with

KafkaEx.fetch(topic, 0, offset: 0)

I can see the messages produced.

However, it looks like my consumer is never called:

defmodule Kafka.Consumer do
  @moduledoc false

  use KafkaEx.GenConsumer

  alias KafkaEx.Protocol.Fetch.Message

  require Logger

  def handle_message_set(message_set, state) do
    IO.inspect("Why do I never get here")
    for %Message{value: message} <- message_set do
      Logger.debug(fn -> "message: " <> inspect(message) end)
    end

    {:async_commit, state}
  end
end

it’s get started like so:

def start_child() do
    consumer_group_opts = [
      # setting for the ConsumerGroup
      heartbeat_interval: 1_000,
      # this setting will be forwarded to the GenConsumer
      commit_interval: 1_000
    ]

    consumer_group_name = Application.get_env(:kafka_ex, :consumer_group)

    child_spec = %{
      id: KafkaEx.ConsumerGroup,
      start: {
        KafkaEx.ConsumerGroup,
        :start_link,
        [Consumer, consumer_group_name, [@topic_consumer], consumer_group_opts]
      }
    }

    DynamicSupervisor.start_child(__MODULE__, child_spec)
  end

What could be the reason it never gets into handle_message_set?

Thanks! :smiley:

iex(3)> Kafka.ConsumerSupervisor |> DynamicSupervisor.which_children
[{:undefined, #PID<0.449.0>, :worker, [KafkaEx.ConsumerGroup]}]

It happened to be a timing issue.
If I run the test and add :timer.sleep(10_000) I do see proof that it passed message_handler_set.