I am trying to connect to upstash kafka using broadway and getting
[info] Group member (localhost-00,coor=#PID<0.531.0>,cb=#PID<0.528.0>,generation=0):
failed to join group
reason: {:client_down,
My broadway code is as below.
defmodule Maverick.Broadway.Upstash do
@moduledoc """
The Upstash Kafka context.
"""
use Broadway
@upstash_kafka_port Application.compile_env!(:broadway, :port)
@upstash_kafka_endpoint Application.compile_env!(:broadway, :endpoint)
@upstash_kafka_hosts [{@upstash_kafka_endpoint, @upstash_kafka_port}]
@upstash_kafka_authentication {Application.compile_env!(:broadway, :sasl_mechanism),
Application.compile_env!(:broadway, :username),
Application.compile_env!(:broadway, :password)}
@upstash_kafka_group_id "localhost-00"
@upstash_kafka_topic "localhost"
@producer_concurrency 1
@processors_concurrency 10
def start_link(_opts) do
IO.inspect(@upstash_kafka_authentication)
IO.inspect(@upstash_kafka_hosts)
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module:
{BroadwayKafka.Producer,
[
hosts: @upstash_kafka_hosts,
group_id: @upstash_kafka_group_id,
topics: [@upstash_kafka_topic],
client_id_prefix: "localhost-maverick",
sasl: @upstash_kafka_authentication,
ssl: true
]},
concurrency: @producer_concurrency
],
processors: [
default: [
concurrency: @processors_concurrency
]
]
)
end
def handle_message(_, message, _) do
message |> IO.inspect()
message
end
end
Any help would be appreciated