Kaffe to Confluent Kafka using a CA certificate - unable to establish a connection

Hi Everyone,

I am trying to connect Kaffe to Confluent Kafka using a CA certificate but I have been unable to establish a connection from my elixir app to Confluent Kafka.

Here is my Kaffe configuration at the moment.

producer: [
        endpoints: [{kafka_host, 29092}],
        topics: ["producer_topics"],
        partition_strategy: :md5,
        #ssl: true,
        sasl: %{
          mechanism: :plain,
          login: env("KAFFE_PRODUCER_USER"),
          password: env("KAFFE_PRODUCER_PASSWORD")
        }
      ]

Regards

Did you manage to connect with an external tool f.ex. kaf?

Through my other app, a dotnet core I could connect and consume and produce messages to the confluent kafka cluster

And here is the error when I attempt to run the app.

** (Mix) Could not start application consumer: Consumer.Application.start(:normal, ) returned an error: shutdown: failed to start child: Kaffe.GroupMemberSupervisor
** (EXIT) an exception was raised:
** (MatchError) no match of right hand side value: [“omitted clusted URL”]
(kaffe 1.25.0) lib/kaffe/config.ex:30: Kaffe.Config.url_endpoint_to_tuple/1
(elixir 1.13.4) lib/enum.ex:1593: Enum.“-map/2-lists^map/1-0-”/2
(kaffe 1.25.0) lib/kaffe/config/consumer.ex:6: Kaffe.Config.Consumer.configuration/0
(kaffe 1.25.0) lib/kaffe/consumer_group/group_member_supervisor.ex:84: Kaffe.GroupMemberSupervisor.subscriber_name/0
(kaffe 1.25.0) lib/kaffe/consumer_group/group_member_supervisor.ex:80: Kaffe.GroupMemberSupervisor.name/0
(kaffe 1.25.0) lib/kaffe/consumer_group/group_member_supervisor.ex:31: Kaffe.GroupMemberSupervisor.start_link/0
(stdlib 4.3.1) supervisor.erl:414: :supervisor.do_start_child_i/3
(stdlib 4.3.1) supervisor.erl:400: :supervisor.do_start_child/2

Here’s a snippet from my (working) config that I use in prod to connect Kaffe to Confluent Kafka

config :kaffe,
  consumer: [
    endpoints: [{"my kafka cluster url", 9092}],
    topics: [ "my topics" ],
    consumer_group: "consumer_group_name",
    message_handler: MyApp.KafkaConsumer,
    worker_allocation_strategy: :worker_per_topic_partition,
    rebalance_delay_ms: 30_000,
    max_wait_time: 15_000,
    offset_reset_policy: :reset_to_earliest,
    ssl: true,
    sasl: %{ mechanism: :plain, login: "my user", password: "some pw" }
  ]

I think you might just need to enable ssl? Check the port as well

1 Like