Broadway Kafka: failed to join group reason: {:client_down

I am trying to connect to upstash kafka using broadway and getting

[info] Group member (localhost-00,coor=#PID<0.531.0>,cb=#PID<0.528.0>,generation=0):
failed to join group
reason: {:client_down,

My broadway code is as below.

defmodule Maverick.Broadway.Upstash do
  @moduledoc """
  The Upstash Kafka context.
  """
  use Broadway

  @upstash_kafka_port Application.compile_env!(:broadway, :port)
  @upstash_kafka_endpoint Application.compile_env!(:broadway, :endpoint)
  @upstash_kafka_hosts [{@upstash_kafka_endpoint, @upstash_kafka_port}]
  @upstash_kafka_authentication {Application.compile_env!(:broadway, :sasl_mechanism),
                                 Application.compile_env!(:broadway, :username),
                                 Application.compile_env!(:broadway, :password)}
  @upstash_kafka_group_id "localhost-00"
  @upstash_kafka_topic "localhost"
  @producer_concurrency 1
  @processors_concurrency 10

  def start_link(_opts) do
    IO.inspect(@upstash_kafka_authentication)
    IO.inspect(@upstash_kafka_hosts)
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module:
          {BroadwayKafka.Producer,
           [
             hosts: @upstash_kafka_hosts,
             group_id: @upstash_kafka_group_id,
             topics: [@upstash_kafka_topic],
             client_id_prefix: "localhost-maverick",
             sasl: @upstash_kafka_authentication,
             ssl: true
           ]},
        concurrency: @producer_concurrency
      ],
      processors: [
        default: [
          concurrency: @processors_concurrency
        ]
      ]
    )
  end

  def handle_message(_, message, _) do
    message |> IO.inspect()
    message
  end
end

Any help would be appreciated

There is this warning and error before it crashes

[warning] :brod_client [#PID<0.517.0>] Maverick.Broadway.Upstash.Broadway.Producer_0.Client is terminating
reason: [
  {{"evolving-kafka.upstash.io", 9092},
   {{{:kpro_req, #Reference<0.1688490899.2583691269.209077>, :api_versions, 0,
      false, []}, :closed},
    [
      {:kpro_lib, :send_and_recv_raw, 4,
       [
         file: ~c"/home/dar/code/gitlab/maverick/deps/kafka_protocol/src/kpro_lib.erl",
         line: 70
       ]},
      {:kpro_lib, :send_and_recv, 5,
       [
         file: ~c"/home/dar/code/gitlab/maverick/deps/kafka_protocol/src/kpro_lib.erl",
         line: 81
       ]},
      {:kpro_connection, :query_api_versions, 4,
       [
         file: ~c"/home/dar/code/gitlab/maverick/deps/kafka_protocol/src/kpro_connection.erl",
         line: 271
       ]},
      {:kpro_connection, :init_connection, 3,
       [
         file: ~c"/home/dar/code/gitlab/maverick/deps/kafka_protocol/src/kpro_connection.erl",
         line: 258
       ]},
      {:kpro_connection, :init, 4,
       [
         file: ~c"/home/dar/code/gitlab/maverick/deps/kafka_protocol/src/kpro_connection.erl",
         line: 195
       ]},
      {:proc_lib, :init_p_do_apply, 3, [file: ~c"proc_lib.erl", line: 241]}
    ]}}
]

[error] GenServer Maverick.Broadway.Upstash.Broadway.Producer_0.Client terminating
** (stop) [{{"evolving-kafka.upstash.io", 9092}, {{{:kpro_req, #Reference<0.1688490899.2583691269.209077>, :api_versions, 0, false, []}, :closed}, [{:kpro_lib, :send_and_recv_raw, 4, [file: ~c"/home/dar/code/gitlab/maverick/deps/kafka_protocol/src/kpro_lib.erl", line: 70]}, {:kpro_lib, :send_and_recv, 5, [file: ~c"/home/dar/code/gitlab/maverick/deps/kafka_protocol/src/kpro_lib.erl", line: 81]}, {:kpro_connection, :query_api_versions, 4, [file: ~c"/home/dar/code/gitlab/maverick/deps/kafka_protocol/src/kpro_connection.erl", line: 271]}, {:kpro_connection, :init_connection, 3, [file: ~c"/home/dar/code/gitlab/maverick/deps/kafka_protocol/src/kpro_connection.erl", line: 258]}, {:kpro_connection, :init, 4, [file: ~c"/home/dar/code/gitlab/maverick/deps/kafka_protocol/src/kpro_connection.erl", line: 195]}, {:proc_lib, :init_p_do_apply, 3, [file: ~c"proc_lib.erl", line: 241]}]}}]
    (brod 3.17.0) /home/dar/code/gitlab/maverick/deps/brod/src/brod_client.erl:622: :brod_client.ensure_metadata_connection/1
    (brod 3.17.0) /home/dar/code/gitlab/maverick/deps/brod/src/brod_client.erl:343: :brod_client.handle_info/2
    (stdlib 5.2) gen_server.erl:1095: :gen_server.try_handle_info/3
    (stdlib 5.2) gen_server.erl:1183: :gen_server.handle_msg/6
    (stdlib 5.2) proc_lib.erl:241: :proc_lib.init_p_do_apply/3
Last message: :init
State: {:state, Maverick.Broadway.Upstash.Broadway.Producer_0.Client, [{"evolving-kafka.upstash.io", 9092}], :undefined, [], :undefined, :undefined, [], Maverick.Broadway.Upstash.Broadway.Producer_0.Client}

Fixed it by updating the config as below, got some lead from How do I connect to Upstash Kafka with Broadway (ssl issues)

defmodule Maverick.Broadway.Upstash do
  @moduledoc """
  The Upstash Kafka context.
  """
  use Broadway

  @upstash_kafka_port Application.compile_env!(:broadway, :port)
  @upstash_kafka_endpoint Application.compile_env!(:broadway, :endpoint)
  @upstash_kafka_hosts [{@upstash_kafka_endpoint, @upstash_kafka_port}]
  @upstash_kafka_authentication {Application.compile_env!(:broadway, :sasl_mechanism),
                                 Application.compile_env!(:broadway, :username),
                                 Application.compile_env!(:broadway, :password)}
  @upstash_kafka_group_id "localhost-00"
  @upstash_kafka_topic "localhost"
  @producer_concurrency 1
  @processors_concurrency 10

  def start_link(_opts) do
    IO.inspect(@upstash_kafka_authentication)
    IO.inspect(@upstash_kafka_hosts)

    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module:
          {BroadwayKafka.Producer,
           [
             hosts: @upstash_kafka_hosts,
             group_id: @upstash_kafka_group_id,
             topics: [@upstash_kafka_topic],
             client_id_prefix: "localhost-maverick",
             client_config: [
               sasl: @upstash_kafka_authentication,
               ssl: [
                 # from CAStore package
                 cacertfile: CAStore.file_path(),
                 verify_type: :verify_peer,
                 customize_hostname_check: [
                   match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
                 ]
               ]
             ]
           ]},
        concurrency: @producer_concurrency
      ],
      processors: [
        default: [
          concurrency: @processors_concurrency
        ]
      ]
    )
  end

  def handle_message(_, message, _) do
    message |> IO.inspect()
    message
  end
end

1 Like