GenStage issue: all consumers are getting events, despite use of DemandDispatcher

I’m trying out GenStage and having a problem: it appears that my :producer is acting according to the BroadcastDispatcher model.

I set up a simple :producer that sends lines of a file to the :consumers to be “processed” (which just sleeps a random number of milliseconds and prints the event). There are three :consumers running.

What I expected to happen was the the events are evenly processed among the three :consumers. What I’m actually seeing is all three of my :consumers are receiving all the events that my :producer is emitting.

My understanding is that the :producers default to DemandDispatcher, so my events should only be handled by one of my three :consumers.

Anyone have any experience with GenStage enough to help me out? Thanks!

I run the following code with mix run -e Runner.main --no-halt

alias Experimental.GenStage

defmodule Runner do
    def main do
        { :ok, file } = File.open "/tmp/things.txt", [:write]
        IO.binwrite file, ( Enum.to_list( 1..100 ) |> Enum.join( "\n" ) )
        File.close file

        { :ok, producer } = GenStage.start_link( Runner.Source, "/tmp/things.txt" )
        
        { :ok, c1 } = GenStage.start_link( Runner.Task, :ok )
        { :ok, c2 } = GenStage.start_link( Runner.Task, :ok )
        { :ok, c3 } = GenStage.start_link( Runner.Task, :ok )
        
        GenStage.sync_subscribe( c1, to: producer )
        GenStage.sync_subscribe( c2, to: producer )
        GenStage.sync_subscribe( c3, to: producer )
    end    
end

defmodule Runner.Task do
    use GenStage

    def init( _ ) do
        IO.puts "Worker #{inspect self} initialized"
        { :consumer, :na }
    end

    def handle_events( events, _from, state ) do
        delay = random( 500 ) + 500
        :timer.sleep( delay )
        IO.puts( "Worker #{inspect self} processing #{inspect events} took #{delay}ms" )
        { :noreply, [], state }
    end

    def my_function( n ) do
        { :ok, "{results for #{n}}" }
    end

    def random( n ) do
        :random.seed( :erlang.now )
        :random.uniform( n )
    end
end

defmodule Runner.Source do
    use GenStage

    def init( filename ) do
        stream = File.stream!( filename )
        { :producer, stream, dispatcher: GenStage.DemandDispatcher }
    end

    def handle_demand( demand, stream ) when demand > 0 do
        IO.puts "Demand is #{demand}"

        events = stream
                 |> Stream.take( demand )
                 |> Enum.to_list

        { :noreply, events, stream }
    end
end

mix.exs file is

defmodule Runner.Mixfile do
  use Mix.Project

  def project do
    [app: :runner,
     version: "0.1.0",
     elixir: "~> 1.3",
     build_embedded: Mix.env == :prod,
     start_permanent: Mix.env == :prod,
     deps: deps()]
  end

  # Configuration for the OTP application
  #
  # Type "mix help compile.app" for more information
  def application do
    [applications: [:logger, :gen_stage]]
  end

  # Dependencies can be Hex packages:
  #
  #   {:mydep, "~> 0.3.0"}
  #
  # Or git/path repositories:
  #
  #   {:mydep, git: "https://github.com/elixir-lang/mydep.git", tag: "0.1.0"}
  #
  # Type "mix help deps" for more examples and options
  defp deps do
    [
      {:gen_stage, "~> 0.4.0"}
    ]
  end
end
1 Like

The issue is in the stream. A File.stream! is stateless: every time you enumerate it, it will open up a file and stream the data in it until desired. This means that, every time there is a demand, you open up the file, read demand lines, and return them.

In order to fix it, you need to suspend the stream, so you don’t start it over and over again. The good news is: GenStage.from_enumerable/2 does exactly that. It will create a producer stage that works as a producer on top of an enumerable or a stream.

You can check the source for more information on how it works: https://github.com/elixir-lang/gen_stage/blob/master/lib/gen_stage/streamer.ex. It is mostly the stream suspension mechanism and some code for tracking consumers (so you terminate once there are no more consumers).

6 Likes

Heh, figured that out about the Streams last night with the help of the Slack channel. But I had no idea about GenStage.from_enumerable/2. That’s sweet! Thanks much!

(Also, @josevalim, great work on Elixir et al. I’m loving it!)

3 Likes

For anyone who is curious about the outcome, here is code (using GenStage.from_enumerable/2) that works as intended (which is to say, a very simple worker pool of processes):

alias Experimental.GenStage

defmodule Runner do
    require IEx

    def main do
        { :ok, file } = File.open "/tmp/things.txt", [:write]
        IO.binwrite file, ( Enum.to_list( 1..100 ) |> Enum.join( "\n" ) )
        File.close file

        { :ok, producer } = GenStage.from_enumerable( File.stream!( "/tmp/things.txt" ) )

        1..10
        |> ( Enum.map fn( _n ) -> GenStage.start_link( Runner.Task, :ok ) end )
        |> ( Enum.map fn( { :ok, c } ) -> GenStage.sync_subscribe( c, to: producer, max_demand: 1 ) end )
    end
end

defmodule Runner.Task do
    use GenStage

    def init( _ ) do
        IO.puts "Worker #{inspect self} initialized"
        { :consumer, :na }
    end

    def handle_events( events, _from, state ) do
        delay = random( 500 ) + 2_000
        :timer.sleep( delay )
        IO.puts( "Worker #{inspect self} processing #{inspect events} took #{delay}ms" )
        { :noreply, [], state }
    end

    def my_function( n ) do
        { :ok, "{results for #{n}}" }
    end

    def random( n ) do
        :random.seed( :erlang.now )
        :random.uniform( n )
    end
end
1 Like
1..10
|> ( Enum.map fn( _n ) -> GenStage.start_link( Runner.Task, :ok ) end )
|> ( Enum.map fn( { :ok, c } ) -> GenStage.sync_subscribe( c, to: producer, max_demand: 1 ) end )

would be far more naturally written

1..10
|> Enum.map(fn( _n ) -> GenStage.start_link( Runner.Task, :ok ) end)
|> Enum.map(fn({:ok, c}) -> GenStage.sync_subscribe( c, to: producer, max_demand: 1 ) end )

or

for _ <- 1..10 do
  {:ok, c} = GenStage.start_link( Runner.Task, :ok )
  GenStage.sync_subscribe( c, to: producer, max_demand: 1 )
end
4 Likes