Compare 2 big sorted files using streams

I need to compare 2 big sorted files line by line and output only lines unique to first file. Just like linux command comm would do. I cannot load whole lists in memory, because they have size 5GB each.

Is there an efficient way to compare these two files using streams?

Thanks!

I don’t think it’s easy to do in elixir, but i would do it somehow like that:

  1. Build a Task for each file, which waits for command to give a next line
defmodule FileStreamTask do
  use Task

  def start_link(arg) do
    Task.start_link(__MODULE__, :run, [arg])
  end

  def run(arg) do
    comparator = Keyword.fetch!(arg, :comparator)
    id = Keyword.fetch!(arg, :id)
    path = Keyword.fetch!(arg, :path)

    :ok = FilesComparator.register(comparator, self())

    File.stream!(path)
    |> Stream.each(&wait_and_send(&1, comparator, id))
    |> Stream.run()
  end

  defp wait_and_send(line, comparator, id) do
    receive do
      :next_line -> FilesComparator.send_line(comparator, id, line)
    end

  end
end
  1. Build a comparator which synchronizes every line inside state and request for new one, when current line is handled.
defmodule FilesComparator do
  use GenServer

  def start_link(arg) do
    GenServer.start_link(__MODULE__, :start_link, [arg])
  end

  def register(comparator, stream) do
    GenServer.call(comparator, {:add_stream, stream})
  end

  def send_line(comparator, stream_id, line) do
    GenServer.cast(comparator, {:line, stream_id, line})
  end

  def init(_arg) do
    {:ok, %{streams: [], line_to_compare: nil}}
  end

  def handle_call({:add_stream, stream}, _caller, %{streams: []} = state) do
    {:reply, :ok, %{state | streams: [stream]}}
  end

  def handle_call({:add_stream, stream}, _caller, state) do
    streams = [stream | state.streams]
    streams |> request_next_line()

    {:reply, :ok, %{state | streams: streams }}
  end

  def handle_cast({:line, stream_id, line}, %{line_to_compare: nil} = state) do
    {:noreply, %{state | line_to_compare: {stream_id, line}}}
  end

  def handle_cast({:line, stream_id, line}, state) do
    [{stream_id, line}, state.line_to_compare]
    |> Enum.sort_by(&elem(&1, 0))
    |> Enum.map(&elem(&1, 1))
    |> compare_lines()

    request_next_line(state.streams)

    {:noreply, %{state | line_to_compare: nil}}
  end

  defp compare_lines([line1, line2]), do: if(line1 != line2, do: do_smthg(line1))

  defp do_smthg(line), do: IO.puts(line)

  defp request_next_line(streams) do
    streams |> Enum.each(fn stream -> send(stream, :next_line) end)
  end
end

And run it

{:ok, comparator} = FilesComparator.start_link([])
{:ok, _task1} = FileStreamTask.start_link(comparator: comparator, path: "path/to/big/one.csv", id: 1)
{:ok, _task2} = FileStreamTask.start_link(comparator: comparator, path: "path/to/big/two.csv", id: 2)

Code is just a sketch, but should give a direction. May be it could be done easer, but i don’t know how.

1 Like

Are the files sorted on the uniqueness you are searching for? for example, sorted from A...Z and it won’t repeat again?

If so you could divide and conquer in smaller chunks with a parallel cursor?

B_LINES = File.stream!(FILE_B) |> Stream.chunk_every(SOME_MANAGABLE_NUMBER) |> take_all()
File.stream!(FILE_A) |> Stream.chunk_every(SOME_MANAGABLE_NUMBER) |> Enum.each(fn chunk ->
  if not chunk in B_LINES do
    # append to your list
    # start writing to a stream to keep memory clean?
  end
end)

My streamfoo is weak so pseudocode is not very neat

You could then spin up workers, agents and concurrently execute the comparison also if you need the speed for comparison something along the lines of @dmitrykleymenov suggestion.

1 Like

Given they entries are sorted, if you read them in chunks, once there is a match in both files, you can store in an accumulator the previous unmatched in the first file as unique, and keep on reading starting with fresh lists on both sides. Once the accumulator reaches certain length, append to a file or wherever you want to store it.