Partition by in elixir broadway

I’am trying to use partition_by option in broadway so that messages of the same partition go to the same processor. I have actually 20 processors and the message data is from 0 to 9

    ....
     processors: [
              default: [
                  max_demand: 50,
                  concurrency: 20
                  
              ]        
     ],
     partition_by: &partition/1
     )
  end
    
  defp partition(msg) do
       msg.data 
  end

  def handle_message(_processor, msg, _ctx) do
    Logger.info "pid #{inspect(self())}"
    ...
    msg
  end

what is weird is when I added some logs inside the handle message callback to watch the processor PID, I always get the same processor PID for all incoming messages. But when I remove the partition_by line, I have different processor pids. Any ideas why partitioning didn’t work?

2 Likes