I am testing out GenStage and messing around with this data
Basically, the file goes through, and takes one byte out of the each line (just for test). I have four consumers, and each consumer has it’s own log file. The problem is that only one log file get’s written too after the first lot of demand.
So basically each consumer starts up and asks for 500 lines each which works fine. So all 4 log files are at 500 bytes. However, then a new log file comes out of no where and that’s the only one that seems to get the rest of demand. That results in this:
-rw-r--r-- 1 Harry staff 1.8M 9 Dec 08:50 results-.log
-rw-r--r-- 1 Harry staff 500B 9 Dec 08:49 results-1.log
-rw-r--r-- 1 Harry staff 500B 9 Dec 08:49 results-2.log
-rw-r--r-- 1 Harry staff 500B 9 Dec 08:49 results-3.log
-rw-r--r-- 1 Harry staff 500B 9 Dec 08:49 results-4.log
The code is below:
alias Experimental.GenStage
defmodule Go do
def go do
{:ok, producer} = GenStage.from_enumerable(File.stream!("nasa.log"), demand: :forward)
{:ok, consumer1} = GsConsumer.start_link("1")
{:ok, consumer2} = GsConsumer.start_link("2")
{:ok, consumer3} = GsConsumer.start_link("3")
{:ok, consumer4} = GsConsumer.start_link("4")
GenStage.sync_subscribe(consumer1, to: producer)
GenStage.sync_subscribe(consumer2, to: producer)
GenStage.sync_subscribe(consumer3, to: producer)
GenStage.sync_subscribe(consumer4, to: producer)
end
end
defmodule GsConsumer do
use GenStage
def start_link(num) do
GenStage.start_link(__MODULE__, [num])
end
def init([num]) do
{:consumer, num}
end
def handle_events(lines, _from, num) do
File.open("results-#{num}.log", [:append], fn(file) ->
lines
|> Enum.map(fn(line) ->
<< head::binary-size(1), rest::binary>> = line
head
end)
|> Enum.map(fn(x) ->
IO.binwrite(file, x)
end)
end)
{:noreply, [], nil}
end
end