Hi all!
Related to this article , what I want to do is to process a text file using Elixir Streams. Now, most of the time is useful to get a stream of lines rather than chunks of text. So I’ve tried different solutions to do what actually IO.stream(iodevice, :line)
File I’ve used in the example
To do the experiments below I’ve used this file: (125mb) https://poeticoding-data.sfo2.digitaloceanspaces.com/httpstream/numbers.txt which is a list of integers, one per line. (30M lines)
Opening the file without the :line option, the enum we get is a stream of chunks, something like
and I want to transform it to a list of lines using Elixir Streams.
0. File.stream
with :line option: performance I want to reach
streams the line for me
# below is the same is every case
|> Stream.map(fn line->
{num,_} = Integer.parse(line)
|> Enum.sum()
On my computer this takes 16 seconds with almost no memory impact. This is the time I want to reach.
1. Using Stream.transform and regex (slowest and strange memory spikes)
File.stream!("numbers.txt",[],2048) #chunks instead of lines
|> Stream.transform("",fn chunk, acc ->
[last_line | lines] =
Regex.split(~r/(?<=\n)/, acc <> chunk)
|> Enum.reverse()
# below is the same is every case
|> Stream.map(fn line->
{num,_} = Integer.parse(line)
|> Enum.sum()
The reason why I split and reverse extracting the last_element from the list, is because the last element could be part of the next chunk. Using the example at the beginning
The last line of the first chunk is part of the first line of the second chunk,
So what I do is to split the first chunk obtaining
and accumulating "111"
which is then concatenated to the next chunk
"111" <> "23\n1000"
obtaining ["11123\n",1000"]
This unfortunately is quite slow and what it makes it slow is the regular expression: 70 seconds.
The advantage is that \n
is preserved like the behaviour of File.stream!
2. Going through the whole binary recursively
I just go through the whole binary creating a list of lines.
def next_line(chunk,current_line\\""), do: next_line(chunk,current_line,[])
def next_line(<<"\n"::utf8, rest::binary>>,current_line,lines) do
next_line(rest,"",[current_line | lines])
def next_line(<<c::utf8, rest::binary>>,current_line,lines) do
next_line(rest,<<current_line::binary, c::utf8>>,lines)
def next_line(<<>>,current_line,lines), do: {Enum.reverse(lines), current_line}
And using this function to emit the lines chunk by chunk
File.stream!("numbers.txt",[],2048) #chunks instead of lines
|> Stream.transform("",&next_line/2)
# below is the same is every case
|> Stream.map(fn line->
{num,_} = Integer.parse(line)
|> Enum.sum()
This is much faster, but still not as fast as using File.stream!
with the :line
option: 21 seconds
3. Interesting case: String.split
Now an interesting case. If I don’t care about that the \n
is preserved, I can use String.split(chunk,"\n")
So, we just need to change the regex from the first to String.split
File.stream!("numbers.txt",[],2048) #chunks instead of lines
|> Stream.transform("",fn chunk, acc ->
[last_line | lines] =
acc <> chunk
|> String.split("\n")
|> Enum.reverse()
# below is the same is every case
|> Stream.map(fn line->
{num,_} = Integer.parse(line)
|> Enum.sum()
This runs at just 8 seconds.
Why the 3 is so much faster? And Why the recursive one (number 2) can’t reach run at the same time of the target example (number 0) ?
Is there a better way to do this?