I’m running a process that will take a long time and should be CPU intensive. I want to do this many, many times concurrently. I assumed that Task.async_stream would manage this but it really isn’t.
The process_chunk function should be taking awhile, but even if it’s not there are 1000s of chunks. When I run this on my measly 4-core machine, it appears that at any given time only one core is marginally maxed out while the rest are barely touched. What can I do to make it actually use all cores?
EDIT: I guess it’s basically down to the processing step not taking as much time as I thought and the chunks being too small. By increasing the size of each chunk (making the chunk processing step take longer) CPU utilization increased and overall speed increased as a result. No permutation of chunk size or worker pool size got me to max out the cores, though, as it topped out at about 80-85% no matter what I did.
Am I misinterpreting the docs for File.stream/3 that suggest it opens with :raw mode by default?
Elixir will open streams in :raw mode with the :read_ahead option if the stream is open in the same node as it is created and no encoding has been specified.
That plays a role but you also have a serial step (the data |> chunk_every |> zip) feeding the parallel part of your pipeline. Therefore the performance can be largely dictated by the serial part (which is also IO bound in your case) (see Amdahl's law - Wikipedia).
In such cases, a better approach could be to compute offsets into the file, and have each process open their own offsets and chunks. But doing so is highly dependent on the file format.
Thanks, that was very helpful. By using a recursive function to generate a chunk and send it off on an async processing step I was able to make full use of the CPU cores and speed things up quite a bit.
In the course of my experimenting I also inadvertently demonstrated the robustness of the BEAM by generating a process for every line of a 50 million line file (not all at once so the process limit was never hit). It wasn’t fast but it didn’t crash.
I’m using :prim_file which is not threadsafe and I probably am not wise enough to touch it normally, but in this instance I know nothing else is going to touch the file so it’s fine and apparently faster as it’s basically directly calling the C nif. It reads a big chunk of bytes, checks that it’s hit a line break or end of file, then sends the chunk off on an async Task to process the lines. While awaiting the result it recurses to produce another chunk.
def run_file(filename) do
workers = 1..@pool_size |> Enum.map(fn w -> String.to_atom("BrcRegistry#{w}") end)
workers |> Enum.each(fn w -> BrcRegistry.start(w) end)
{:ok, file} = :prim_file.open(to_string(filename), [:binary, :read, :read_ahead])
process_file(file, Stream.cycle(workers))
print_tables(workers)
end
def process_file(file, worker_queue) do
worker = Enum.take(worker_queue, 1) |> hd()
w_q = Stream.drop(worker_queue, 1)
case :prim_file.read(file, 80_000_000) do
:eof ->
:ok
{:ok, buffer} ->
buffer =
case :prim_file.read_line(file) do
:eof ->
buffer
{:ok, line} ->
<<buffer::binary, line::binary>>
end
t = Task.async(fn -> process_lines(buffer, worker) end)
process_file(file, w_q)
Task.await(t, :infinity)
end
end
def process_lines("", _worker), do: :ok
def process_lines(lines, worker) do
case parse_line(lines) do
{station, {temp, rest}} ->
BrcRegistry.register(worker, station, temp)
process_lines(rest, worker)
:ok ->
:ok
end
end
def parse_line(""), do: :ok
def parse_line(<<"\n", rest::binary>>), do: parse_line(rest)
def parse_line(chunk) do
Parse.parse_line(chunk)
end