Broadway Cloud PubSub Producer

Hi,

I am using the Broadway Cloud PubSub producer and ran into a situation where it looked like the producer stopped polling GCP for new messages.

The Broadway process was still alive and there were no errors involving HTTP requests or subscriptions, but the messages continued to pile up for days in my pubsub queue.

I tried looking into the code for the producer and was wondering if this could be an issue. It looks like the Producer stops polling if the demand = the number of messages received from PubSub and the consumer never sends demand again…but I’m still trying to wrap my head around how/if this could happen. Does anyone have any knowledge of this? Here is the code I’m referring to:

defp handle_receive_messages(%{receive_timer: nil, demand: demand} = state) when demand > 0 do
    messages = receive_messages_from_pubsub(state, demand)
    new_demand = demand - length(messages)

    receive_timer =
      case {messages, new_demand} do
        {[], _} -> schedule_receive_messages(state.receive_interval)
        {_, 0} -> nil
        _ -> schedule_receive_messages(0)
      end

    {:noreply, messages, %{state | demand: new_demand, receive_timer: receive_timer}}
  end
1 Like