Fixed it by updating the config as below, got some lead from How do I connect to Upstash Kafka with Broadway (ssl issues)
defmodule Maverick.Broadway.Upstash do
@moduledoc """
The Upstash Kafka context.
"""
use Broadway
@upstash_kafka_port Application.compile_env!(:broadway, :port)
@upstash_kafka_endpoint Application.compile_env!(:broadway, :endpoint)
@upstash_kafka_hosts [{@upstash_kafka_endpoint, @upstash_kafka_port}]
@upstash_kafka_authentication {Application.compile_env!(:broadway, :sasl_mechanism),
Application.compile_env!(:broadway, :username),
Application.compile_env!(:broadway, :password)}
@upstash_kafka_group_id "localhost-00"
@upstash_kafka_topic "localhost"
@producer_concurrency 1
@processors_concurrency 10
def start_link(_opts) do
IO.inspect(@upstash_kafka_authentication)
IO.inspect(@upstash_kafka_hosts)
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module:
{BroadwayKafka.Producer,
[
hosts: @upstash_kafka_hosts,
group_id: @upstash_kafka_group_id,
topics: [@upstash_kafka_topic],
client_id_prefix: "localhost-maverick",
client_config: [
sasl: @upstash_kafka_authentication,
ssl: [
# from CAStore package
cacertfile: CAStore.file_path(),
verify_type: :verify_peer,
customize_hostname_check: [
match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
]
]
]
]},
concurrency: @producer_concurrency
],
processors: [
default: [
concurrency: @processors_concurrency
]
]
)
end
def handle_message(_, message, _) do
message |> IO.inspect()
message
end
end