Can Brod get/set topic retention?

So far I hadn’t been able to figure out how to get or set topic retention values for kafka topics via brod, using versions up to 3.16.3. I can set it when creating a topic by passing name-value pairs when creating topic.

        config = [
          %{name: "retention.ms", value: "-1"},
          %{name: "retention.bytes", value: "-1"}
        ]

Is this even possible via Brod, or one must install Java and kafka client libraries and invoke kafka-configs.sh or kafka_topics.sh to do it? It seems like a huge limitation. For example, I can’t drop and re-create a topic with same retention because I don’t have all of the metadata.

However, I’m looking at https://hexdocs.pm/brod/3.16.3/brod_client.html#get_metadata/2 and it should return

struct()

struct() = #{field_name() => field_value()} | [{field_name(), field_value()}]

but I’m getting a different struct…

I must be missing something.

What are you receiving as metadata then? Is the field just missing or are you not receiving {ok, kpro:struct()} | {error, any()}?

I’m getting

%{
	brokers: [
		%{host: "localhost", node_id: 0, port: 9092, rack: ""}
	],
	cluster_id: "blah", 
	controller_id: 0,
	topics: [
		%{
			error_code: :no_error, 
			is_internal: false, 
			name: "test_topic_creation", 
			partitions: [
				%{error_code: :no_error, isr_nodes: [0], leader_id: 0, partition_index: 0, replica_nodes: [0]},
				%{error_code: :no_error, isr_nodes: [0], leader_id: 0, partition_index: 5, replica_nodes: [0]},
				%{error_code: :no_error, isr_nodes: [0], leader_id: 0, partition_index: 4, replica_nodes: [0]},
				%{error_code: :no_error, isr_nodes: [0], leader_id: 0, partition_index: 1, replica_nodes: [0]},
				%{error_code: :no_error, isr_nodes: [0], leader_id: 0, partition_index: 6, replica_nodes: [0]},
				%{error_code: :no_error, isr_nodes: [0], leader_id: 0, partition_index: 2, replica_nodes: [0]},
				%{error_code: :no_error, isr_nodes: [0], leader_id: 0, partition_index: 3, replica_nodes: [0]}
			]
		}
	]
}

So it does indeed return {ok, #{field_name() => field_value()}} but yeah, sorry, I don’t know another way to get that info. You may look into the kafka_protocol application (kpro modules and friends) but you may have to encode and decode the request yourself.

1 Like

This looks promising, but I need help translating erlang to Elixir.

API spec: kafka_protocol/kpro_req_lib.erl at 7147d6e16652b9b1c39758250bdf93ee934ec706 · kafka4beam/kafka_protocol · GitHub

struct():

code:

@version 0
@resource_type_topic 2 # I could not find the values in neither code nor documentation (Just INT8 in https://kafka.apache.org/protocol.html#The_Messages_DescribeConfigs), so I got into online Java IDE and did System.out.println(org.apache.kafka.common.resource.ResourceType.TOPIC.ordinal()); "2" came out.

IO.inspect(:kpro_req_lib.describe_configs(@version, [%{resource_type: @resource_type_topic}, %{resource_name: @test_topic}], %{validate_only: true}))

output:

** (ErlangError) Erlang error: {:field_missing, [stack: [{:describe_configs, 0}, :resources, :resource_name], input: %{resource_type: 2}]}
   stacktrace:
    (kafka_protocol 4.0.3) /Users/maximsenin/workspace/cogynt-common/kafka/deps/kafka_protocol/src/kpro_req_lib.erl:459: :kpro_req_lib.enc_struct/3

as the next step, I want to try to use kafka_protocol/kpro_req_lib.erl at 7147d6e16652b9b1c39758250bdf93ee934ec706 · kafka4beam/kafka_protocol · GitHub

Also, I don’t understand how to pass in the broker or zk information here - how does it get connected? Can’t find examples anywhere…

I solved it using kafka_protocol library, although it was quite a brain tease to figure out how to call Erlang from Elixir, by wrapping :kpro_req_lib.describe_configs, :kpro_brokers.with_connection, :brod_utils.request_sync and :kpro_req_lib.alter_configs APIs. It was very helpful to go through the source code for the unit tests included with kafka_protocol library.

1 Like