How do I connect to Upstash Kafka with Broadway (ssl issues)

I am trying to connect to my Upstash Kafka cluster using BroadwayKafka. When connecting I get the following error:

[warning] :brod_client [#PID<0.623.0>] MyProject.Data.Pipeline.Broadway.Producer_0.Client is terminating
reason: [
  {{:"project-name.upstash.io", 9092},
   {{:failed_to_upgrade_to_ssl,
     {:tls_alert,
      {:handshake_failure,
       ~c"TLS client: In state wait_cert_cr at ssl_handshake.erl:2140 generated CLIENT ALERT: Fatal - Handshake Failure\n {bad_cert,hostname_check_failed}"}}},
    [
      {:kpro_connection, :maybe_upgrade_to_ssl, 5,
...snip

My connection parameters currently look like this:

{BroadwayKafka.Producer,
  [
   hosts: ["project-name.upstash.io": 9092],
   group_id: "group_1",
   topics: ["events"],
   client_config: [
     sasl:
       {:scram_sha_256,
        "username",
        "password"},
     ssl: [
       cacertfile: CAStore.file_path(), # from CAStore package
       verify_type: :verify_peer
     ]
   ]
  ]}

I tried setting ssl: true instead of manual config, it fails with an error indicating that ca certs are missing. I tried setting verify_type: :verify_none but the error is the same. I tried setting server_name_indication: "project-name.upstash.io" because the docs said it was important to set it, but I’m not sure that is the correct value and it gives me a different error:

[warning] :brod_client [#PID<0.456.0>] MyProject.Data.Pipeline.Broadway.Producer_0.Client is terminating
reason: [
  {{:"smooth-grouper-5277-us1-kafka.upstash.io", 9092},
   {{:failed_to_upgrade_to_ssl,
     {:options,
      {:server_name_indication, "project-name.upstash.io"}}},
    [
      {:kpro_connection, :maybe_upgrade_to_ssl, 5,

I feel this is probably a really simple issue, if only I were familiar with the erlang ssl options. Any help would be appreciated!

1 Like

:wave: @bluephosphor

You might need to provide :customize_hostname_check option to make :verify_peer work with subdomains / wildcard certs.

iex> Mix.install [:castore]
iex> :ssl.start()
:ok

iex> :ssl.connect(~c"gist.github.com", 443, verify: :verify_peer, cacertfile: CAStore.file_path())
{:error,
 {:tls_alert,
  {:handshake_failure,
   'TLS client: In state wait_cert_cr at ssl_handshake.erl:2113 generated CLIENT ALERT: Fatal - Handshake Failure\n {bad_cert,hostname_check_failed}'}}}

iex> :ssl.connect(~c"gist.github.com", 443, verify: :verify_peer, cacertfile: CAStore.file_path(), customize_hostname_check: [match_fun: :public_key.pkix_verify_hostname_match_fun(:https)])
{:ok,
 {:sslsocket, {:gen_tcp, #Port<0.9>, :tls_connection, :undefined},
  [#PID<0.205.0>, #PID<0.204.0>]}}

More info: Erlang standard library: ssl | EEF Security WG

1 Like

Thank you so much for the detailed answer, and for taking the time to test it yourself.