Using brod for kafka message consumption. Can run on iex, but not when I run the application...

I’m trying to configure brod to consume messages from a kafka cluster. When I create the client via iex I can create a consumer for the topic - no error message is show. However, when I run the app, trying to use brod’s group subscriber v2, I get Group authorization failed. Here’s my code:

config :brod,
  # Remote Kafka brokers (can list multiple for redundancy)
  clients: [
    bbd: [
      endpoints: [{~c"kafka.cluster.host", 9092}],
      reconnect_cool_down_seconds: 10,
      ssl: [
        verify: :verify_peer,
        cacertfile: "/etc/ssl/certs/ca-certificates.crt",
        depth: 3,
        customize_hostname_check: [match_fun: :public_key.pkix_verify_hostname_match_fun(:https)]
      ],
      sasl: {:plain, System.get_env("KAFKA_CLUSTER_KEY"), System.get_env("KAFKA_CLUSTER_SECRET")}
    ]
  ]

here’s the brod group subsciber:

defmodule Bbd.Reader do
  require Logger
  @behaviour :brod_group_subscriber_v2

  def child_spec(_arg) do
    Logger.debug("CHILD_SPEC CALLED")

    config = %{
      client: :bbd,
      group_id: "bbd.int-test17",
      topics: ["topic-name"],
      cb_module: __MODULE__,
      consumer_config: [{:begin_offset, :earliest}],
      init_data: [],
      message_type: :message_set,
      group_config: [
        offset_commit_policy: :commit_to_kafka_v2,
        offset_commit_interval_seconds: 5,
        rejoin_delay_seconds: 60,
        reconnect_cool_down_seconds: 60
      ]
    }

    %{
      id: __MODULE__,
      start: {:brod_group_subscriber_v2, :start_link, [config]},
      type: :worker,
      restart: :temporary,
      shutdown: 5000
    }
  end

  @impl :brod_group_subscriber_v2
  def init(_group_id, _init_data) do
    Logger.debug("INIT CALLED")
    {:ok, []}
  end

  @impl :brod_group_subscriber_v2
  def handle_message(message, _state) do
    IO.inspect(message, label: "message")
    {:ok, :commit, []}
  end
end

here’s my application.ex:

I’m trying to configure brod to consume messages from a kafka cluster. When I create the client via iex I can create a consumer for the topic - no error message is show. However, when I run the app, trying to use brod’s group subscriber v2, I get Group authorization failed. Here’s my code:

config :brod,
  # Remote Kafka brokers (can list multiple for redundancy)
  clients: [
    bbd: [
      endpoints: [{~c"kafka.cluster.host", 9092}],
      reconnect_cool_down_seconds: 10,
      ssl: [
        verify: :verify_peer,
        cacertfile: "/etc/ssl/certs/ca-certificates.crt",
        depth: 3,
        customize_hostname_check: [match_fun: :public_key.pkix_verify_hostname_match_fun(:https)]
      ],
      sasl: {:plain, System.get_env("KAFKA_CLUSTER_KEY"), System.get_env("KAFKA_CLUSTER_SECRET")}
    ]
  ]

here’s the brod group subsciber:

defmodule Bbd.Reader do
  require Logger
  @behaviour :brod_group_subscriber_v2

  def child_spec(_arg) do
    Logger.debug("CHILD_SPEC CALLED")

    config = %{
      client: :bbd,
      group_id: "bbd.int-test17",
      topics: ["topic-name"],
      cb_module: __MODULE__,
      consumer_config: [{:begin_offset, :earliest}],
      init_data: [],
      message_type: :message_set,
      group_config: [
        offset_commit_policy: :commit_to_kafka_v2,
        offset_commit_interval_seconds: 5,
        rejoin_delay_seconds: 60,
        reconnect_cool_down_seconds: 60
      ]
    }

    %{
      id: __MODULE__,
      start: {:brod_group_subscriber_v2, :start_link, [config]},
      type: :worker,
      restart: :temporary,
      shutdown: 5000
    }
  end

  @impl :brod_group_subscriber_v2
  def init(_group_id, _init_data) do
    Logger.debug("INIT CALLED")
    {:ok, []}
  end

  @impl :brod_group_subscriber_v2
  def handle_message(message, _state) do
    IO.inspect(message, label: "message")
    {:ok, :commit, []}
  end
end

Thanks in advance :slightly_smiling_face: