PartitionDispatcher Consumers not receiving events?

Playing around with getting a basic GenStage with PartitionDispatcher working.

Unfortunately my Consumers are never receiving any events.

GitHub repo with my code: https://github.com/somlor/gen_stage_partition_dispatcher

And here is a trimmed view of the relevant code:

  # PRODUCER

  def init(initial_state) do
    dispatcher_options = [
      partitions: partitions_list,
      hash: &partition_by_first_letter/1
    ]
    {:producer, initial_state, dispatcher: {PartitionDispatcher, dispatcher_options}}
  end

  def handle_demand(demand, state) when demand > 0 do
    words = Enum.map(1..demand, fn(_) -> Faker.Lorem.word end)
    {:noreply, words, state}
  end

  def partitions_list do
    ~w(one two three four)
  end

  def partition_by_first_letter(event) do
    partition = case first_char_downcase(event) do
      c when c in ~w(a b c d e f g h i) -> "one"
      c when c in ~w(j k l m n o p q r) -> "two"
      c when c in ~w(s t u v w x y z)   -> "three"
      _                                 -> "four"
    end
    {event, partition}
  end

  # CONSUMER

  def init(initial_state) do
    {:consumer, initial_state}
  end

  def handle_events(words, _from_producer, state) do
    IO.puts("=== received words ===")
    IO.inspect(words)

    {:noreply, [], state}
  end

  # START

  {:ok, producer} = Producer.start_link
  Producer.partitions_list
  |> Enum.each(fn(partition) ->
    {:ok, consumer} = Consumer.start_link
    GenStage.sync_subscribe(consumer,
      to: producer,
      partition: partition,
      min_demand: 5,
      max_demand: 10
    )
  end)

Any tips appreciated. :slight_smile:

Thanks,
Sean

Update: I’ve got it working now by using integers as the partition names instead of strings.

I think this line in GenStage.PartitionDispatcher docs on current master through me off (emphasis mine):

The partition dispatcher accepts the following options on initialization:

  • :partitions - an eumerable that sets the names of the partitions we will dispatch to.

Right. GenStage v0.5.0 expects partitions to be integers. We have generalized them to be any term on master.

Actually, ignore me. It seems you are using master. So it was supposed to work with strings too. I will investigate.

Done, this bug has been fixed in master. You can use strings as your partitions now although I would recommend using atoms as they are faster for comparisons.

1 Like

Fantastic! Thanks @josevalim. :sunglasses: