Hello jkrmrto, thanks for answering so fast!
I`ve dived into some GenStage tutorials and ended up with something close for what I need, but I still have some issues.
This is my current code
# Producer
defmodule GenstageExample.Producer do
use GenStage
def start_link(initial) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(list), do: {:producer, list}
def handle_demand(_, state) do
{:noreply, state, state}
end
end
# Consumer
defmodule GenstageExample.Consumer do
use GenStage
# Starts consumer with his code
def start_link(inital_state) do
GenStage.start_link(__MODULE__, inital_state)
end
def init(state) do
# Next line should filter messages sent from producer
{:consumer, state,
subscribe_to: [
{GenstageExample.Producer, selector: fn %{key: key} -> state[:code] == key end}
]}
end
def handle_events(events, _from, state) do
# Prints every message received
for event <- events do
IO.inspect({self(), "Received: " <> event, "Expected: " <> state[:code]})
end
# As a consumer we never emit events
{:noreply, [], state}
end
end
And this is my application.exs
defmodule GenstageExample.Application do
use Application
@impl true
def start(_type, _args) do
import Supervisor.Spec, warn: false
children = [
{GenstageExample.Producer, ["A", "B", "C"]},
# {GenstageExample.ProducerConsumer, []},
%{
id: 1,
start: {GenstageExample.Consumer, :start_link, [%{code: "A"}]}
},
%{
id: 2,
start: {GenstageExample.Consumer, :start_link, [%{code: "B"}]}
},
%{
id: 3,
start: {GenstageExample.Consumer, :start_link, [%{code: "C"}]}
}
]
opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
Supervisor.start_link(children, opts)
end
end
The problem is that the selector is not working, the consumer A stills receives messages B and C, and the same thing happens in B and C Consumers.
I know I can place an if-statement in handle_events function of Consumer, but I think it should be better using your suggestion(filtering with selector)
Could you please analyze my code and help me see where is the mistake?
This is my console output after running “mix run --no-halt”
{#PID<0.198.0>, "Received: A", "Expected: C"}
{#PID<0.197.0>, "Received: A", "Expected: B"}
{#PID<0.196.0>, "Received: A", "Expected: A"}
{#PID<0.198.0>, "Received: B", "Expected: C"}
{#PID<0.197.0>, "Received: B", "Expected: B"}
{#PID<0.196.0>, "Received: B", "Expected: A"}
{#PID<0.198.0>, "Received: C", "Expected: C"}
{#PID<0.197.0>, "Received: C", "Expected: B"}
{#PID<0.196.0>, "Received: C", "Expected: A"}