Best way to send a message to a custom Broadway producer?

I have a custom Broadway Producer that fetches from a queue. As new messages/jobs are put in the queue I’d like to notify a custom producer to fetch its latent demand from the queue so it can deliver to the Processors.

def MyCustomProducer do
  ...
  @impl true
  def handle_cast(:job_enqueued, state) do
    handle_receive_messages(state)
  end
end
def JobQueue do
  ...
  def enqueue(job) do
    # put in queue
    # get pid of producer?
    GenStage.cast(some_pid, :job_enqueued)
  end
end

I figured the most straight forward way would be to cast to the Producer pid notifying it to fetch messages, but I’m not sure how to find the Producer pid given it’s managed/wrapped by Broadway. Should I be using something like Broadway.producer_names/1 or Broadway.push_messages/2 in this use case?

This ended up working:

producer_name = Broadway.producer_names(MyBroadwayPipeline) |> Enum.random()
GenStage.cast(producer_name, :job_enqueued)
5 Likes