I’m trying out GenStage and having a problem: it appears that my :producer is acting according to the BroadcastDispatcher model.
I set up a simple :producer that sends lines of a file to the :consumers to be “processed” (which just sleeps a random number of milliseconds and prints the event). There are three :consumers running.
What I expected to happen was the the events are evenly processed among the three :consumers. What I’m actually seeing is all three of my :consumers are receiving all the events that my :producer is emitting.
My understanding is that the :producers default to DemandDispatcher, so my events should only be handled by one of my three :consumers.
Anyone have any experience with GenStage enough to help me out? Thanks!
I run the following code with mix run -e Runner.main --no-halt
alias Experimental.GenStage
defmodule Runner do
def main do
{ :ok, file } = File.open "/tmp/things.txt", [:write]
IO.binwrite file, ( Enum.to_list( 1..100 ) |> Enum.join( "\n" ) )
File.close file
{ :ok, producer } = GenStage.start_link( Runner.Source, "/tmp/things.txt" )
{ :ok, c1 } = GenStage.start_link( Runner.Task, :ok )
{ :ok, c2 } = GenStage.start_link( Runner.Task, :ok )
{ :ok, c3 } = GenStage.start_link( Runner.Task, :ok )
GenStage.sync_subscribe( c1, to: producer )
GenStage.sync_subscribe( c2, to: producer )
GenStage.sync_subscribe( c3, to: producer )
end
end
defmodule Runner.Task do
use GenStage
def init( _ ) do
IO.puts "Worker #{inspect self} initialized"
{ :consumer, :na }
end
def handle_events( events, _from, state ) do
delay = random( 500 ) + 500
:timer.sleep( delay )
IO.puts( "Worker #{inspect self} processing #{inspect events} took #{delay}ms" )
{ :noreply, [], state }
end
def my_function( n ) do
{ :ok, "{results for #{n}}" }
end
def random( n ) do
:random.seed( :erlang.now )
:random.uniform( n )
end
end
defmodule Runner.Source do
use GenStage
def init( filename ) do
stream = File.stream!( filename )
{ :producer, stream, dispatcher: GenStage.DemandDispatcher }
end
def handle_demand( demand, stream ) when demand > 0 do
IO.puts "Demand is #{demand}"
events = stream
|> Stream.take( demand )
|> Enum.to_list
{ :noreply, events, stream }
end
end
mix.exs file is
defmodule Runner.Mixfile do
use Mix.Project
def project do
[app: :runner,
version: "0.1.0",
elixir: "~> 1.3",
build_embedded: Mix.env == :prod,
start_permanent: Mix.env == :prod,
deps: deps()]
end
# Configuration for the OTP application
#
# Type "mix help compile.app" for more information
def application do
[applications: [:logger, :gen_stage]]
end
# Dependencies can be Hex packages:
#
# {:mydep, "~> 0.3.0"}
#
# Or git/path repositories:
#
# {:mydep, git: "https://github.com/elixir-lang/mydep.git", tag: "0.1.0"}
#
# Type "mix help deps" for more examples and options
defp deps do
[
{:gen_stage, "~> 0.4.0"}
]
end
end