Flow processing long lines file

Hello,

I’m playing with Flow, just a copy&paste of the example code found in the github README of the project. Everything works well except that when I process a 1.2 Gb text file. The file doesn’t have too many lines, but each line is longer than 15 Kb.

The execution takes forever. Observer shows millions of reductions and just a few messages in the queues of the GenStage processes so I guess the problem has to do with the long lines of the file.

This is not a problem for me, actually it’s just a test, but I’d like to know what is happening.

Thank you very much.

Juan Miguel

1 Like

Maybe my question is too vague, I will try to add some additional information.

This is the code from the GitHub project’s README that I’m using.

File.stream!("path/to/some/file")
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
  Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

And this is a Go code to compare to that can execute the task quite fast.

https://play.golang.org/p/jxep8DfnN1

As mentioned in the previous post, the only problem I can see is related to the long string management in the Erlang VM, but I’d like to know what you think about it.

Please, let me know if I can add any further information.

Thanks.

Juan Miguel

1 Like

You have the option to stream files by a set number of bytes instead of per line. Other than that, I’m not sure what you can do to improve this.

You probably want to alter the default max_demand setting. By default it’s going to process lines in chunks of 2000 lines at a time. For very large lines this may not be what you want.

1 Like

Can we confirm that this is a limit of Flow’s possibilities?

The limit here has very little to do with flow, and more to do with File.stream!. File.stream! will either do it by bytes, or by line, and a line may be arbitrarily large and not actually what you want to deal with.

Secondly, the comparison with go code is not the same. As I already said, the Elixir example is reading in and processing 2000 lines at a time. It’s also gonna have a default concurrency of System.schedulers_online() which is a concurrency of 8, not go’s 32.

I don’t think it is a limit, since Flow is configurable in that regard.

The problem of parsing an input stream is however a hard one, and what settings work best is very dependent on your actual input.

For many applications, it is very sensible to split up information based on newlines. But when you have very long lines, you might rather want to split based on another character (e.g. the , for CSV-like data, or when counting words), or based upon some fixed amount of bytes.

It depends on what you have as input what works an what not. This is not a limitation of Flow, this is a problem decision you will need to make in any language and environment.

Thanks guys! Good to know.