Stream a file with dynamic chunks

I have a file to process, and I want to solve it with Stream.

Here is an example of such file:

Start 1
foo
Start 2
foo
bar
Start 3
lorem
ipsum
hello world

I want to split it into 3 chunks:

["Start 1", "foo"]
["Start 2", "foo", "bar"]
["Start 3", "lorem", "ipsum", "hello world"]

But then I probably need some kind of look-ahead. A bit lost how to do it efficiently with streams.

With Enum I would probably reduce it to something like {all_chunks, current_chunk}, where I would keep track of current chunk which has not yet ended. When I meet a line with Start X then current_chunk is pushed to all_chunks and the next current_chunk starts.

Look at Stream.chunk_while. It should do exactly what you describe.

4 Likes

Thanks for the tip. Seems to work.

defmodule Chunker do

  @source ~w/
    Start_1
    foo
    Start_2
    foo
    bar
    Start_3
    lorem
    ipsum
    hello/

  def run do
    @source
    |> Stream.map(&String.trim/1)
    |> Stream.chunk_while([], &chunk_fun/2, &after_fun/1)
  end

  defp chunk_fun(line, acc) do
    if String.contains?(line, "Start") do
      process_start_line(line, acc)
    else
      process_line(line, acc)
    end
  end

  defp after_fun(acc) do
    case acc do
      [] -> {:cont, []}
      current_chunk -> {:cont, Enum.reverse(current_chunk), []}
    end
  end

  defp process_start_line(line, []) do
    {:cont, [line]}
  end

  defp process_start_line(line, current_chunk) do
    {:cont, Enum.reverse(current_chunk), [line]}
  end

  defp process_line(line, current_chunk) do
    {:cont, [line | current_chunk]}
  end
end

Result:

iex> Chunker.run |> Enum.to_list
[
  ["Start_1", "foo"],
  ["Start_2", "foo", "bar"],
  ["Start_3", "lorem", "ipsum", "hello"]
]

Is this what you had in mind?

1 Like