Iterating over a file stream

Say I have a very big text file having around 1,000,000 lines. I want to split this file into 5 equal parts and store it as par1, part2 etc. How can I do that in Elixir?

What have you tried so far? Is it exactly 1 million lines or approximately so?

1 Like

No the number of lines can be so high, the file I was trying to split have 13M lines. Basically I was trying to split the big file into small parts and perform an aggregate operation on each of small files and combine them.

I used the stream module to read the input file as follows

File.stream!(“inputfile.txt”)

But I am not sure how to iterate over this big file and take n parts and write it to another input file.

1 Like

Gotcha. This seems like a bit of an X Y problem. What’s the aggregate operation? If we know that we can help suggest optimal ways to approach computing it on a very large file. Splitting the file into 5 different files may not actually be the fastest way to do this. I’d consider checking out https://hexdocs.pm/flow/Flow.html

5 Likes

There are aggregation operation such as finding sum, average. But in addition to that there are auto correction, sanitisation of few data points. Finally I need the full file as an output with the modified lines.

My thought was, If I am able to split the file into chunks based on lines, say 200K lines in a file, I can spin up 4 or 5 process and do the aggregation, cleanup etc on each chunk and combine them.

1 Like

But then you have additional overhead for getting those chunks in the correct order again.

As you need the lines in the correct order after the operation, a single file |> Stream.map(your_fun) |> Stream.map(fun line -> IO.write(dest_file, line) end) |> Stream.run should be enough and do the job.

3 Likes

But if we have to approach the problem of splitting this big file, what will be the way forward. Here the problem is I am not able to get lines as follows

stream = File.stream!("inputfile") |> save_part
first_200k_lines = stream.take(200000) |> save_part
second_200k_lines = stream.take(200000) |> save_part
1 Like

How do you want to split?

Do you want to have n files, where each file has line k*n+j? Do you want to split into files which contain the next consecutive chunk of lines, such that file i has lines i*n to i*n+lines_per_file?

Again, if you need the lines in original order, the easiest and most performant way is to process them in order. Please step away from writing them into split up temporary files. This will at least double diskusage, and therefore probably more than double effective run time.

stream does not have a field take, and if you do stream |> Stream.take(200_000) you’ll end with a stream containing the first 200_000 lines, but nothing has happened yet. And as I’m not sure what save_part/1 does, I can only guess that it consumes the stream. Also I have no clue what it returns, but I doubt you can use it as a stream later on.

1 Like

So the idea is to split a file with N number of lines to x number of parts each having N/x lines. So in case of a 1M line file the idea is to split into 5 parts each having 200K lines. The final file may have less than the N/x lines depending on the number of lines.

stream.take(200000) this was a mistake from my side I meant

first_200k_lines = stream.take(200000) |> save_part

save_part is a function that saves those lines in to a file.

1 Like

You can’t save parts of the stream, you’ll still need to process them “at once”.

You need Stream.chunk_every/2. Roughly like this:

input_stream
|> Stream.chunk_every(200_000)
|> Stream.with_index()
|> Stream.each(fn {lines, idx} ->
  file = open_file_for_chunk(idx)
  Enum.each(lines, &IO.write(file, &1))
end)
|> Stream.run

But be aware though, that this will create a list of lines with 200_000 entries, which has to be GC’d at once, and therefore might result in a larger memory footprint as the direct version, which again will stress your disk less.

2 Likes

Right. There’s this a priori assumption that this is faster but I can’t understand why. If you want to split a file of N lines into K groups you still need to traverse the whole N. Instead of spending all that time and IO just to split it, do the computation. You can even do it in parallel! If you chunk the initial file in memory you can do the computationally expensive work on parallel chunks. Task.async_stream or Flow will be useful here. Then you can recombine everything into a single file.

4 Likes

I would love to but I want to have a lesser memory footprint, I can don’t have to load all the chunks at a time. Can’t we read a part of file and read the next part later ?

1 Like

Does this mean you have to read and keep the whole file in memory?

1 Like

The initial approach I have shown you in Iterating over a file stream, should not read the full file into memory.

Well, as with every GC’d language, you can’t controll how much of the file is actually read before GC, but this techique makes it relatively easy for the GC to flush older lines away.

2 Likes

This is the script I finally wrote to split the file. I would like to optimize it better on the split method.

defmodule FileSplitter do

  def usage do
    IO.puts "Usage: elixir #{Path.basename(__ENV__.file)} <file_to_split> <number_of_parts>"
  end

  def number_of_lines(filename) do
    filename
      |> File.stream!
      |> Enum.count
  end

  def save_part(lines, chunk_id) do
    case File.open("part_#{chunk_id}", [:write]) do
      {:ok, file} ->
        Enum.each(lines, &IO.puts(file, &1))
        File.close(file)
      _ ->
        IO.puts "Unable to open the fille for writing."
    end
  end

  def split(filename, parts) do
    lines_per_file = filename
      |> number_of_lines
      |> div(parts)

    if lines_per_file < 1 do
      IO.puts "Cannot split this file."
      System.halt()
    end

    filename
      |> File.stream!
      |> Stream.chunk_every(lines_per_file)
      |> Stream.with_index
      |> Stream.each(fn({lines, chunk_id}) -> save_part(lines, chunk_id) end)
      |> Stream.run
  end
end

case System.argv() do
  [filename, parts] -> IO.puts "#{filename} #{parts}"
    FileSplitter.split(filename, String.to_integer(parts))
  _ ->
    FileSplitter.usage
end
1 Like

Again, this requires you to have the full chunk in memory.

I, and I think mony others, won’t help you anymore with the chunking approach. It has a lot of downsides.

Have you actually tried the aforementioned approach of simply doing a single stream that modifies and stores as suggested many times throughout this thread?

2 Likes

I’m not suggesting that you load all the chunks into memory. For example:

File.stream!("big_file.txt")
|> Stream.chunk_every(2000, 2000, [])
|> Task.async_stream(fn chunk ->
  # compute some work on the chunk
end)
|> Stream.into("destination.txt")

This

  1. avoids pulling everything into memory
  2. still has parallel work
  3. does only one file read and one file write.

If you need complex aggregation behavior you’ll probably want Flow, which has features for grouping rows and reducing on them.

6 Likes

Before You take the chunk path I would suggest You to look at this video. It’s all about processing big data with Stream and aggregating across distributed processes.

Chunking will not help You with aggregate function.

PS: Why do You need an Elixir tool? Bash already has a split command. (I don’t know about windows shell)

$ man split
SPLIT(1)                  BSD General Commands Manual                 SPLIT(1)

NAME
     split -- split a file into pieces

SYNOPSIS
     split [-a suffix_length] [-b byte_count[k|m]] [-l line_count] [-p pattern] [file [name]]
5 Likes

I agree with others that splitting the file seems like a needless complication. The sketch proposed by @benwilson512 in this answer is the one I’d suggest trying. It’s simple, doesn’t require flow, and should work in most situations.

If you find that this simple chunking is not working well, flow could help. But I suggest you first try it without flow.

If neither of these approaches is not working for your problem, I suggest you share the simplest possible task definition where these approaches won’t work, but splitting the file would help.

1 Like

Yes that was the first thing I did. It might be my crazy though to split the file and do it. But when I had that thought I couldn’t think of the right way to split the file without loading the full file.