I noticed that disconnect occurs 10 minutes after connection. This value is simillar to connections.max.idle.ms
and looks like kafka disonnects idle connections. I know that some kafka clients supports reconnect on idle
feature. Does the brod/brodway_kafka support it?
UPD:
brod_client process is alive and BroadwayKafka.BrodClient.connected?/1
says that it is connected and :brod.fetch/4
also returns messages. But in state we see dead_since
iex(admin@)32> :sys.get_state(:erlang.whereis(Admin.Kafka.Consumer.Broadway.Producer_0.Client))
{:state, Admin.Kafka.Consumer.Broadway.Producer_0.Client, [{"kafka", 9092}],
#PID<0.31736.27>,
[
{:conn, {"kafka-0", 9092},
{:dead_since, {1658, 257147, 463027}, {:shutdown, :tcp_closed}}}
], #PID<0.5428.4>, #PID<0.5430.4>, [connect_timeout: 10000],
Admin.Kafka.Consumer.Broadway.Producer_0.Client}
iex(admin@7)33> BroadwayKafka.BrodClient.connected?(Admin.Kafka.Consumer.Broadway.Producer_0.Client)
true
looks like brod client reconnects but consumer doesn’t rejoin
I checked Producer state and got timeout on :sys.get_state
because it stuck in handle_info callback on BrodClient.stop_group_coordinator -> :brod_group_coordinator.stop
iex(admin@)13> :erlang.process_info(:erlang.whereis(Admin.Kafka.Consumer.Broadway.Producer_1))
[
registered_name: Admin.Kafka.Consumer.Broadway.Producer_1,
current_function: {:brod_group_coordinator, :stop, 1},
initial_call: {:proc_lib, :init_p, 5},
status: :waiting,
message_queue_len: 4,
links: [#PID<0.3880.0>],
dictionary: [
{63, []},
{62, []},
{61, []},
{60, []},
{59, []},
{58, []},
{57, []},
{56, []},
{55, []},
{54, []},
{53, []},
{52, []},
{51, []},
{50, []},
{49, []},
{:"$initial_call", {GenStage, :init, 1}},
{48, []},
{:"$ancestors",
[Admin.Kafka.Consumer.Broadway.ProducerSupervisor,
Admin.Kafka.Consumer.Broadway.Supervisor, Admin.Kafka.Consumer,
Admin.Supervisor, #PID<0.3569.0>]},
{47, []},
{46, []},
{45, []},
{44, []},
{43, []},
{42, []},
{41, []},
{40, []},
{39, []},
{38, []},
{37, []},
{36, []},
{35, []},
{34, []},
{33, []},
{32, []},
{31, []},
{30, []},
{29, []},
{28, []},
{27, []},
{26, []},
{25, []},
{24, ...},
{...},
...
],
trap_exit: true,
error_handler: :error_handler,
priority: :normal,
group_leader: #PID<0.3568.0>,
total_heap_size: 20338,
heap_size: 2586,
stack_size: 29,
reductions: 20656517,
garbage_collection: [
max_heap_size: %{error_logger: true, kill: true, size: 0},
min_bin_vheap_size: 46422,
min_heap_size: 233,
fullsweep_after: 65535,
minor_gcs: 2858
],
suspending: []
]