Hi all!
Related to this article , what I want to do is to process a text file using Elixir Streams. Now, most of the time is useful to get a stream of lines rather than chunks of text. So I’ve tried different solutions to do what actually IO.stream(iodevice, :line)
does:
File I’ve used in the example
To do the experiments below I’ve used this file: (125mb) https://poeticoding-data.sfo2.digitaloceanspaces.com/httpstream/numbers.txt which is a list of integers, one per line. (30M lines)
Goal
Opening the file without the :line option, the enum we get is a stream of chunks, something like
["1231\n212\n1000\n","100\n212\n111","23\n1000","2\n102\n"]
and I want to transform it to a list of lines using Elixir Streams.
["1231\n","212\n","1000\n","100\n","212\n","11123\n","10002\n",102\n"]
0. File.stream
with :line option: performance I want to reach
File.stream!
streams the line for me
File.stream!("numbers.txt")
# below is the same is every case
|> Stream.map(fn line->
{num,_} = Integer.parse(line)
num
end)
|> Enum.sum()
On my computer this takes 16 seconds with almost no memory impact. This is the time I want to reach.
1. Using Stream.transform and regex (slowest and strange memory spikes)
File.stream!("numbers.txt",[],2048) #chunks instead of lines
|> Stream.transform("",fn chunk, acc ->
[last_line | lines] =
Regex.split(~r/(?<=\n)/, acc <> chunk)
|> Enum.reverse()
{Enum.reverse(lines),last_line}
end)
# below is the same is every case
|> Stream.map(fn line->
{num,_} = Integer.parse(line)
num
end)
|> Enum.sum()
The reason why I split and reverse extracting the last_element from the list, is because the last element could be part of the next chunk. Using the example at the beginning
"100\n212\n111","23\n1000"
The last line of the first chunk is part of the first line of the second chunk,
So what I do is to split the first chunk obtaining
["100\n","212\n"]
and accumulating "111"
which is then concatenated to the next chunk
"111" <> "23\n1000"
obtaining ["11123\n",1000"]
.
This unfortunately is quite slow and what it makes it slow is the regular expression: 70 seconds.
The advantage is that \n
is preserved like the behaviour of File.stream!
2. Going through the whole binary recursively
I just go through the whole binary creating a list of lines.
def next_line(chunk,current_line\\""), do: next_line(chunk,current_line,[])
def next_line(<<"\n"::utf8, rest::binary>>,current_line,lines) do
next_line(rest,"",[current_line | lines])
end
def next_line(<<c::utf8, rest::binary>>,current_line,lines) do
next_line(rest,<<current_line::binary, c::utf8>>,lines)
end
def next_line(<<>>,current_line,lines), do: {Enum.reverse(lines), current_line}
And using this function to emit the lines chunk by chunk
File.stream!("numbers.txt",[],2048) #chunks instead of lines
|> Stream.transform("",&next_line/2)
# below is the same is every case
|> Stream.map(fn line->
{num,_} = Integer.parse(line)
num
end)
|> Enum.sum()
This is much faster, but still not as fast as using File.stream!
with the :line
option: 21 seconds
3. Interesting case: String.split
Now an interesting case. If I don’t care about that the \n
is preserved, I can use String.split(chunk,"\n")
So, we just need to change the regex from the first to String.split
File.stream!("numbers.txt",[],2048) #chunks instead of lines
|> Stream.transform("",fn chunk, acc ->
[last_line | lines] =
acc <> chunk
|> String.split("\n")
|> Enum.reverse()
{Enum.reverse(lines),last_line}
end)
# below is the same is every case
|> Stream.map(fn line->
{num,_} = Integer.parse(line)
num
end)
|> Enum.sum()
This runs at just 8 seconds.
Questions
Why the 3 is so much faster? And Why the recursive one (number 2) can’t reach run at the same time of the target example (number 0) ?
Is there a better way to do this?