I don’t think it’s easy to do in elixir, but i would do it somehow like that:
- Build a Task for each file, which waits for command to give a next line
defmodule FileStreamTask do
use Task
def start_link(arg) do
Task.start_link(__MODULE__, :run, [arg])
end
def run(arg) do
comparator = Keyword.fetch!(arg, :comparator)
id = Keyword.fetch!(arg, :id)
path = Keyword.fetch!(arg, :path)
:ok = FilesComparator.register(comparator, self())
File.stream!(path)
|> Stream.each(&wait_and_send(&1, comparator, id))
|> Stream.run()
end
defp wait_and_send(line, comparator, id) do
receive do
:next_line -> FilesComparator.send_line(comparator, id, line)
end
end
end
- Build a comparator which synchronizes every line inside state and request for new one, when current line is handled.
defmodule FilesComparator do
use GenServer
def start_link(arg) do
GenServer.start_link(__MODULE__, :start_link, [arg])
end
def register(comparator, stream) do
GenServer.call(comparator, {:add_stream, stream})
end
def send_line(comparator, stream_id, line) do
GenServer.cast(comparator, {:line, stream_id, line})
end
def init(_arg) do
{:ok, %{streams: [], line_to_compare: nil}}
end
def handle_call({:add_stream, stream}, _caller, %{streams: []} = state) do
{:reply, :ok, %{state | streams: [stream]}}
end
def handle_call({:add_stream, stream}, _caller, state) do
streams = [stream | state.streams]
streams |> request_next_line()
{:reply, :ok, %{state | streams: streams }}
end
def handle_cast({:line, stream_id, line}, %{line_to_compare: nil} = state) do
{:noreply, %{state | line_to_compare: {stream_id, line}}}
end
def handle_cast({:line, stream_id, line}, state) do
[{stream_id, line}, state.line_to_compare]
|> Enum.sort_by(&elem(&1, 0))
|> Enum.map(&elem(&1, 1))
|> compare_lines()
request_next_line(state.streams)
{:noreply, %{state | line_to_compare: nil}}
end
defp compare_lines([line1, line2]), do: if(line1 != line2, do: do_smthg(line1))
defp do_smthg(line), do: IO.puts(line)
defp request_next_line(streams) do
streams |> Enum.each(fn stream -> send(stream, :next_line) end)
end
end
And run it
{:ok, comparator} = FilesComparator.start_link([])
{:ok, _task1} = FileStreamTask.start_link(comparator: comparator, path: "path/to/big/one.csv", id: 1)
{:ok, _task2} = FileStreamTask.start_link(comparator: comparator, path: "path/to/big/two.csv", id: 2)
Code is just a sketch, but should give a direction. May be it could be done easer, but i don’t know how.