Playing around with getting a basic GenStage with PartitionDispatcher working.
Unfortunately my Consumers are never receiving any events.
GitHub repo with my code: https://github.com/somlor/gen_stage_partition_dispatcher
And here is a trimmed view of the relevant code:
# PRODUCER
def init(initial_state) do
dispatcher_options = [
partitions: partitions_list,
hash: &partition_by_first_letter/1
]
{:producer, initial_state, dispatcher: {PartitionDispatcher, dispatcher_options}}
end
def handle_demand(demand, state) when demand > 0 do
words = Enum.map(1..demand, fn(_) -> Faker.Lorem.word end)
{:noreply, words, state}
end
def partitions_list do
~w(one two three four)
end
def partition_by_first_letter(event) do
partition = case first_char_downcase(event) do
c when c in ~w(a b c d e f g h i) -> "one"
c when c in ~w(j k l m n o p q r) -> "two"
c when c in ~w(s t u v w x y z) -> "three"
_ -> "four"
end
{event, partition}
end
# CONSUMER
def init(initial_state) do
{:consumer, initial_state}
end
def handle_events(words, _from_producer, state) do
IO.puts("=== received words ===")
IO.inspect(words)
{:noreply, [], state}
end
# START
{:ok, producer} = Producer.start_link
Producer.partitions_list
|> Enum.each(fn(partition) ->
{:ok, consumer} = Consumer.start_link
GenStage.sync_subscribe(consumer,
to: producer,
partition: partition,
min_demand: 5,
max_demand: 10
)
end)
Any tips appreciated.
Thanks,
Sean