I want to take the first 250000 lines of a text file and write them in another text file

Hi all,

I am new here and happy to be part of this community. I need help with some task so would really appreciate if someone can take a look at this.
I have a text file containing some raw data, approximately 480K lines and What I am trying to do is take the first 250K rows and copy/write them to a second file. I have been trying several ways but I am failing. This is the code I am currently using:

defmodule SplitFile do
 def printline do
File.stream!("NeedsToSplit.txt", read_ahead: 460000)
|> Stream.chunk_every(250000)
|> Stream.take(250000)
|> Task.async_stream(fn chunk -> Enum.each(chunk, &File.write("destination1.txt", &1)) end)
end
end

This seems to process the data but does not write it in the destination file and I don’t understand why.

I also tried using &IO.write(filename) instead of File.write but I get this error below:

  ** (CaseClauseError) no case clause matching: {"destination1.txt"}
    (elixir 1.11.2) lib/io.ex:681: IO.write/2
    (elixir 1.11.2) lib/enum.ex:786: Enum."-each/2-lists^foreach/1-0-"/2
    (elixir 1.11.2) lib/enum.ex:786: Enum.each/2
    (elixir 1.11.2) lib/enum.ex:792: anonymous fn/3 in Enum.each/2
    (elixir 1.11.2) lib/enum.ex:3461: anonymous fn/3 in Enum.each/2
    (elixir 1.11.2) lib/stream.ex:1597: anonymous fn/3 in Enumerable.Stream.reduce/3
    (elixir 1.11.2) lib/stream.ex:723: anonymous fn/3 in Stream.take_after_guards/2
    (elixir 1.11.2) lib/stream.ex:1597: anonymous fn/3 in Enumerable.Stream.reduce/3

Any help is highly appreciated!
Thanks in advance

Here’s what I came up with. You are using Streams (as you should) but the stream needs to be evaluated since they are lazy by default. Enum.to_list is a good way to evaluate the stream.

You were chunking your text file in groups of 250,000 so you only need to take 1 to retrieve the first 250,000 lines of the file.

I opened a file handle with File.open!/1 and used IO.write/2 for better performance as noted in the docs for File.write/3 File — Elixir v1.12.2

I upped the task timeout (5s was too short, 30s is probably overkill) as well.

defmodule SplitFile do
  def printline do
    destination = File.open!("destination1.txt", [:write])
    File.stream!("NeedsToSplit.txt", read_ahead: 460_000)
    |> Stream.chunk_every(250_000)
    |> Stream.take(1)
    |> Task.async_stream(fn chunk -> Enum.each(chunk, &(IO.write(destination, &1))) end, timeout: 30_000)
    |> Enum.to_list()
  end
end

Here’s wc run against my input and output files:

$ wc NeedsToSplit.txt
1000001 1000001 5000005 NeedsToSplit.txt
$ wc destination1.txt
250000  250000 1250000 destination1.txt
3 Likes

This works:

File.stream!("a.txt", [], :line)
|> Stream.take(250_000)
|> Stream.into(File.stream!("b.txt"))
|> Stream.run()
6 Likes

Couple thoughts:

  • Instead of building chunks of 250000 elements and then taking one, just take 250000.
  • if the stream has one element, Task.async_stream isn’t doing anything helpful
  • if it has more than one, Task.async_stream + IO.write == data corruption hazard. async_stream puts the results back in the original order, but it’s entirely possible for the tasks to run in a different order
4 Likes

Thanks for the suggestion. This does the task however I am now found with like double the rows I had in my initial file. Does this make sense? Do I need to make some trimming perhaps?

Hmmm this should not be the case. You should have 250_00 rows.

Maybe your file has \r\n line endings and it interferes? I do not know if there is an option for that in File.stream/3.

Anyway, you can always debug:

File.stream!("a.txt", [], :line)
|> Stream.take(250_000)
|> Stream.each(fn line ->
  # do whatever you want with line
  line |> IO.inspect(label: "line")
end)
|> Stream.into(File.stream!("b.txt"))
|> Stream.run()

Thank you for the suggestion. To be honest, this sounds more complicated but I am eager to try this and have a couple of questions as well if you don’t mind answering.

1- Why do we need to open the file first? Isn’t it costly or am I understanding this operation wrong?
2- What if I ditch using Stream.chunk_every as I only need the first 250K, Stream.take is handy for the task?
3- Why do I need to use enum.to_list? I want the data just as is and wouldn’t enum.to_list might change the returned data?
4- Why do I need a timeout? Or as long as 250K are passed through for 30s it does the task?
5- What happens to the second chunk? What If I want to have 2 separate files with each chunk?

Thanks in advance!