My understanding is that this is an upstash issue. I have a topic with 3 partitions but I only get assignment for 2 partitions. Below is my broadway code. Is there anything in the config that could be causing this missing partition assignment ?
defmodule Maverick.Broadway.Upstash do
@moduledoc """
The Upstash Kafka context.
"""
use Broadway
alias Maverick.Whatsapp.Message, as: WhatsappMessage
def upstash_kafka_port(), do: Application.fetch_env!(:maverick, :upstash_kafka_port)
def upstash_kafka_endpoint(), do: Application.fetch_env!(:maverick, :upstash_kafka_endpoint)
def upstash_kafka_hosts(), do: [{upstash_kafka_endpoint(), upstash_kafka_port()}]
def upstash_kafka_authentication(),
do:
{Application.fetch_env!(
:maverick,
:upstash_kafka_sasl_mechanism
), Application.fetch_env!(:maverick, :upstash_kafka_username),
Application.fetch_env!(:maverick, :upstash_kafka_password)}
def upstash_kafka_group_id(), do: "localhost-00"
def upstash_kafka_topic(), do: "localhost"
def producer_concurrency(), do: 2
def processors_concurrency(), do: 10
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module:
{BroadwayKafka.Producer,
[
hosts: upstash_kafka_hosts(),
group_id: upstash_kafka_group_id(),
topics: [upstash_kafka_topic()],
offset_reset_policy: :earliest,
client_id_prefix: "localhost-maverick",
client_config: [
sasl: upstash_kafka_authentication(),
ssl: [
# from CAStore package
cacertfile: CAStore.file_path(),
verify_type: :verify_peer,
customize_hostname_check: [
match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
]
]
]
]},
concurrency: producer_concurrency()
],
processors: [
default: [
concurrency: processors_concurrency()
]
]
)
end