Broadway Kafka: failed to join group reason: {:client_down

Fixed it by updating the config as below, got some lead from How do I connect to Upstash Kafka with Broadway (ssl issues)

defmodule Maverick.Broadway.Upstash do
  @moduledoc """
  The Upstash Kafka context.
  """
  use Broadway

  @upstash_kafka_port Application.compile_env!(:broadway, :port)
  @upstash_kafka_endpoint Application.compile_env!(:broadway, :endpoint)
  @upstash_kafka_hosts [{@upstash_kafka_endpoint, @upstash_kafka_port}]
  @upstash_kafka_authentication {Application.compile_env!(:broadway, :sasl_mechanism),
                                 Application.compile_env!(:broadway, :username),
                                 Application.compile_env!(:broadway, :password)}
  @upstash_kafka_group_id "localhost-00"
  @upstash_kafka_topic "localhost"
  @producer_concurrency 1
  @processors_concurrency 10

  def start_link(_opts) do
    IO.inspect(@upstash_kafka_authentication)
    IO.inspect(@upstash_kafka_hosts)

    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module:
          {BroadwayKafka.Producer,
           [
             hosts: @upstash_kafka_hosts,
             group_id: @upstash_kafka_group_id,
             topics: [@upstash_kafka_topic],
             client_id_prefix: "localhost-maverick",
             client_config: [
               sasl: @upstash_kafka_authentication,
               ssl: [
                 # from CAStore package
                 cacertfile: CAStore.file_path(),
                 verify_type: :verify_peer,
                 customize_hostname_check: [
                   match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
                 ]
               ]
             ]
           ]},
        concurrency: @producer_concurrency
      ],
      processors: [
        default: [
          concurrency: @processors_concurrency
        ]
      ]
    )
  end

  def handle_message(_, message, _) do
    message |> IO.inspect()
    message
  end
end

1 Like