GenStage 4 consumers but only one seems to be doing work

I am testing out GenStage and messing around with this data

Basically, the file goes through, and takes one byte out of the each line (just for test). I have four consumers, and each consumer has it’s own log file. The problem is that only one log file get’s written too after the first lot of demand.

So basically each consumer starts up and asks for 500 lines each which works fine. So all 4 log files are at 500 bytes. However, then a new log file comes out of no where and that’s the only one that seems to get the rest of demand. That results in this:

-rw-r--r--  1 Harry  staff   1.8M  9 Dec 08:50 results-.log
-rw-r--r--  1 Harry  staff   500B  9 Dec 08:49 results-1.log
-rw-r--r--  1 Harry  staff   500B  9 Dec 08:49 results-2.log
-rw-r--r--  1 Harry  staff   500B  9 Dec 08:49 results-3.log
-rw-r--r--  1 Harry  staff   500B  9 Dec 08:49 results-4.log

The code is below:

alias Experimental.GenStage

defmodule Go do
  def go do
    {:ok, producer} = GenStage.from_enumerable(File.stream!("nasa.log"), demand: :forward)
    {:ok, consumer1} = GsConsumer.start_link("1")
    {:ok, consumer2} = GsConsumer.start_link("2")
    {:ok, consumer3} = GsConsumer.start_link("3")
    {:ok, consumer4} = GsConsumer.start_link("4")

    GenStage.sync_subscribe(consumer1, to: producer)
    GenStage.sync_subscribe(consumer2, to: producer)
    GenStage.sync_subscribe(consumer3, to: producer)
    GenStage.sync_subscribe(consumer4, to: producer)
  end

end

defmodule GsConsumer do
  use GenStage

  def start_link(num) do
    GenStage.start_link(__MODULE__, [num])
  end

  def init([num]) do
    {:consumer, num}
  end

  def handle_events(lines, _from, num) do
    File.open("results-#{num}.log", [:append], fn(file) ->
      lines
      |> Enum.map(fn(line) ->
        << head::binary-size(1), rest::binary>> = line
        head
      end)
      |> Enum.map(fn(x) ->
        IO.binwrite(file, x)
      end)
    end)
    {:noreply, [], nil}
  end
end

Figured this one out actually.

Turns out I was wrong - all 4 consumers were working its because I forget to add the process num that’s used for the log file back into the state.

  def handle_events(lines, _from, num) do
    File.open("results-#{num}.log", [:append], fn(file) ->
      lines
      |> Enum.map(fn(line) ->
        << head::binary-size(1), rest::binary>> = line
        head
      end)
      |> Enum.map(fn(x) ->
        IO.binwrite(file, x)
      end)
    end)
    {:noreply, [], nil}
  end

As you can see at the end there, I am setting the state to nil when instead I should be setting it to num.

3 Likes