I’m trying to configure brod to consume messages from a kafka cluster. When I create the client via iex I can create a consumer for the topic - no error message is show. However, when I run the app, trying to use brod’s group subscriber v2, I get Group authorization failed. Here’s my code:
config :brod,
# Remote Kafka brokers (can list multiple for redundancy)
clients: [
bbd: [
endpoints: [{~c"kafka.cluster.host", 9092}],
reconnect_cool_down_seconds: 10,
ssl: [
verify: :verify_peer,
cacertfile: "/etc/ssl/certs/ca-certificates.crt",
depth: 3,
customize_hostname_check: [match_fun: :public_key.pkix_verify_hostname_match_fun(:https)]
],
sasl: {:plain, System.get_env("KAFKA_CLUSTER_KEY"), System.get_env("KAFKA_CLUSTER_SECRET")}
]
]
here’s the brod group subsciber:
defmodule Bbd.Reader do
require Logger
@behaviour :brod_group_subscriber_v2
def child_spec(_arg) do
Logger.debug("CHILD_SPEC CALLED")
config = %{
client: :bbd,
group_id: "bbd.int-test17",
topics: ["topic-name"],
cb_module: __MODULE__,
consumer_config: [{:begin_offset, :earliest}],
init_data: [],
message_type: :message_set,
group_config: [
offset_commit_policy: :commit_to_kafka_v2,
offset_commit_interval_seconds: 5,
rejoin_delay_seconds: 60,
reconnect_cool_down_seconds: 60
]
}
%{
id: __MODULE__,
start: {:brod_group_subscriber_v2, :start_link, [config]},
type: :worker,
restart: :temporary,
shutdown: 5000
}
end
@impl :brod_group_subscriber_v2
def init(_group_id, _init_data) do
Logger.debug("INIT CALLED")
{:ok, []}
end
@impl :brod_group_subscriber_v2
def handle_message(message, _state) do
IO.inspect(message, label: "message")
{:ok, :commit, []}
end
end
here’s my application.ex
:
I’m trying to configure brod to consume messages from a kafka cluster. When I create the client via iex I can create a consumer for the topic - no error message is show. However, when I run the app, trying to use brod’s group subscriber v2, I get Group authorization failed. Here’s my code:
config :brod,
# Remote Kafka brokers (can list multiple for redundancy)
clients: [
bbd: [
endpoints: [{~c"kafka.cluster.host", 9092}],
reconnect_cool_down_seconds: 10,
ssl: [
verify: :verify_peer,
cacertfile: "/etc/ssl/certs/ca-certificates.crt",
depth: 3,
customize_hostname_check: [match_fun: :public_key.pkix_verify_hostname_match_fun(:https)]
],
sasl: {:plain, System.get_env("KAFKA_CLUSTER_KEY"), System.get_env("KAFKA_CLUSTER_SECRET")}
]
]
here’s the brod group subsciber:
defmodule Bbd.Reader do
require Logger
@behaviour :brod_group_subscriber_v2
def child_spec(_arg) do
Logger.debug("CHILD_SPEC CALLED")
config = %{
client: :bbd,
group_id: "bbd.int-test17",
topics: ["topic-name"],
cb_module: __MODULE__,
consumer_config: [{:begin_offset, :earliest}],
init_data: [],
message_type: :message_set,
group_config: [
offset_commit_policy: :commit_to_kafka_v2,
offset_commit_interval_seconds: 5,
rejoin_delay_seconds: 60,
reconnect_cool_down_seconds: 60
]
}
%{
id: __MODULE__,
start: {:brod_group_subscriber_v2, :start_link, [config]},
type: :worker,
restart: :temporary,
shutdown: 5000
}
end
@impl :brod_group_subscriber_v2
def init(_group_id, _init_data) do
Logger.debug("INIT CALLED")
{:ok, []}
end
@impl :brod_group_subscriber_v2
def handle_message(message, _state) do
IO.inspect(message, label: "message")
{:ok, :commit, []}
end
end
Thanks in advance