Streaming lines from an enum of chunks

Hi all!

Related to this article , what I want to do is to process a text file using Elixir Streams. Now, most of the time is useful to get a stream of lines rather than chunks of text. So I’ve tried different solutions to do what actually IO.stream(iodevice, :line) does:

File I’ve used in the example
To do the experiments below I’ve used this file: (125mb) https://poeticoding-data.sfo2.digitaloceanspaces.com/httpstream/numbers.txt which is a list of integers, one per line. (30M lines)

Goal
Opening the file without the :line option, the enum we get is a stream of chunks, something like

["1231\n212\n1000\n","100\n212\n111","23\n1000","2\n102\n"]
and I want to transform it to a list of lines using Elixir Streams.

["1231\n","212\n","1000\n","100\n","212\n","11123\n","10002\n",102\n"]

0. File.stream with :line option: performance I want to reach

File.stream! streams the line for me

File.stream!("numbers.txt")
# below is the same is every case
|> Stream.map(fn line-> 
	{num,_} = Integer.parse(line)
	num
end)
|> Enum.sum()

On my computer this takes 16 seconds with almost no memory impact. This is the time I want to reach.

1. Using Stream.transform and regex (slowest and strange memory spikes)

File.stream!("numbers.txt",[],2048) #chunks instead of lines
|> Stream.transform("",fn chunk, acc ->
	[last_line | lines] = 
		Regex.split(~r/(?<=\n)/, acc <> chunk)
		|> Enum.reverse()
	{Enum.reverse(lines),last_line}
end)
# below is the same is every case
|> Stream.map(fn line-> 
	{num,_} = Integer.parse(line)
	num
end)
|> Enum.sum()

The reason why I split and reverse extracting the last_element from the list, is because the last element could be part of the next chunk. Using the example at the beginning

"100\n212\n111","23\n1000"
The last line of the first chunk is part of the first line of the second chunk,
So what I do is to split the first chunk obtaining
["100\n","212\n"] and accumulating "111" which is then concatenated to the next chunk
"111" <> "23\n1000" obtaining ["11123\n",1000"].

This unfortunately is quite slow and what it makes it slow is the regular expression: 70 seconds.
The advantage is that \n is preserved like the behaviour of File.stream!

2. Going through the whole binary recursively

I just go through the whole binary creating a list of lines.

def next_line(chunk,current_line\\""), do: next_line(chunk,current_line,[])

def next_line(<<"\n"::utf8, rest::binary>>,current_line,lines) do
	next_line(rest,"",[current_line | lines])
end

def next_line(<<c::utf8, rest::binary>>,current_line,lines) do
	next_line(rest,<<current_line::binary, c::utf8>>,lines)
end

def next_line(<<>>,current_line,lines), do: {Enum.reverse(lines), current_line}

And using this function to emit the lines chunk by chunk

File.stream!("numbers.txt",[],2048) #chunks instead of lines
|> Stream.transform("",&next_line/2)
# below is the same is every case
|> Stream.map(fn line-> 
	{num,_} = Integer.parse(line)
	num
end)
|> Enum.sum()

This is much faster, but still not as fast as using File.stream! with the :line option: 21 seconds

3. Interesting case: String.split

Now an interesting case. If I don’t care about that the \n is preserved, I can use String.split(chunk,"\n")
So, we just need to change the regex from the first to String.split

File.stream!("numbers.txt",[],2048) #chunks instead of lines
|> Stream.transform("",fn chunk, acc ->
	[last_line | lines] = 
      acc <> chunk
      |> String.split("\n")
      |> Enum.reverse()
	{Enum.reverse(lines),last_line}
end)
# below is the same is every case
|> Stream.map(fn line-> 
	{num,_} = Integer.parse(line)
	num
end)
|> Enum.sum()

This runs at just 8 seconds.

Questions

Why the 3 is so much faster? And Why the recursive one (number 2) can’t reach run at the same time of the target example (number 0) ?

Is there a better way to do this?

2 Likes

I’m not sure about the answer to your question but I would think that with some tweaking approach #2 would be the fastest. Have you checked what chunk size IO.stream uses internally with the :line option (assuming it is using chunking internally)?

Also do you have a sample test harness that could be used to experiment with these approaches?

I’ve created a gist with the code to easily replicate the benchmark: https://gist.github.com/alvises/c9fe2cfcac9f1528c5ba3dc3c8c27db9
(minimal mix.exs file, lines_bench.exs and readme with the url where to get the sample file)

I’m using a chunk size of 2048, which seems to be the default one. Increasing it makes the process slower and with higher memory allocation and decreasing it to a half doesn’t change the performance.

Using the small file (4.4Mb) with benchee I get

Name                     ips        average  deviation         median         99th %
3_string_split          3.84      260.21 ms     ±1.60%      258.60 ms      272.98 ms
0_lines                 2.04      489.40 ms     ±4.33%      488.38 ms      553.58 ms
2_recursive             1.46      685.88 ms     ±3.07%      689.55 ms      724.09 ms
1_regex_split           0.60     1673.97 ms     ±1.85%     1681.53 ms     1712.15 ms

Comparison:
3_string_split          3.84
0_lines                 2.04 - 1.88x slower +229.18 ms
2_recursive             1.46 - 2.64x slower +425.67 ms
1_regex_split           0.60 - 6.43x slower +1413.75 ms

So String.split is far away the fastest, which seems odd… what do you think?

you probably can save time in the double reverse time by:

{last_line, lines} =
      acc <> chunk
      |> String.split("\n")
      |> List.pop_at(-1)
1 Like

ah nice! Thanks @rhruiz! Is it faster than double reverse?

It will only traverse the list once

Looking at the code it seems that pop does something similar to a double reverse:
List.pop_at
List.do_pop_at

It checks the length at the beginning (L710), which is a slow operation with long lists, then recursively goes back accumulating a reversed list which is reversed at the end, which makes sense since lists are linked lists and we can’t just traverse the list removing the last item.

To just extract the last item it seems that the “double reverse” is much faster:

list = Enum.to_list 1..10_000

Benchee.run(%{
	"double reverse" => fn -> 
		[_popped_element| rev_list] = Enum.reverse(list)
		_poped_list = Enum.reverse(rev_list)
	end,

	"List.pop_at(-1)" => fn ->
		{_popped_element, _popped_list} = List.pop_at(list, -1)
	end
},
time: 10, 
memory_time: 2
)
Name                      ips        average  deviation         median         99th %
double reverse        19.00 K       52.63 μs    ±50.42%          61 μs         139 μs
List.pop_at(-1)        7.39 K      135.35 μs    ±26.58%         128 μs         273 μs

Comparison:
double reverse        19.00 K
List.pop_at(-1)        7.39 K - 2.57x slower +82.72 μs
4 Likes

I believe #3 is faster because under the hood String.split(binary, "\n") uses :binary.split/3 and per the docs:

Although the majority of functions could be provided using bit-syntax, the functions in this library are highly optimized and are expected to either execute faster or consume less memory, or both, than a counterpart written in pure Erlang.

1 Like