Task.async_stream not using all cores (solved-ish)

I’m running a process that will take a long time and should be CPU intensive. I want to do this many, many times concurrently. I assumed that Task.async_stream would manage this but it really isn’t.

pool_size = :erlang.system_info(:logical_processors) * 5
workers = 1..pool_size
data = File.stream!("my/file")

data
|> Stream.chunk_every(1000)
|> Stream.zip(Stream.cycle(workers))
|> Task.async_stream(fn {chunk, worker} -> process_chunk(chunk, worker) end, max_concurrency: pool_size, timeout: :infinity)
|> Enum.to_list()

The process_chunk function should be taking awhile, but even if it’s not there are 1000s of chunks. When I run this on my measly 4-core machine, it appears that at any given time only one core is marginally maxed out while the rest are barely touched. What can I do to make it actually use all cores?


EDIT: I guess it’s basically down to the processing step not taking as much time as I thought and the chunks being too small. By increasing the size of each chunk (making the chunk processing step take longer) CPU utilization increased and overall speed increased as a result. No permutation of chunk size or worker pool size got me to max out the cores, though, as it topped out at about 80-85% no matter what I did.

2 Likes

Try using raw files, that should skip sending copies of data between processes.

1 Like

Am I misinterpreting the docs for File.stream/3 that suggest it opens with :raw mode by default?

Elixir will open streams in :raw mode with the :read_ahead option if the stream is open in the same node as it is created and no encoding has been specified.

I have wondered that myself but never checked the source and just made my own code explicit.

That plays a role but you also have a serial step (the data |> chunk_every |> zip) feeding the parallel part of your pipeline. Therefore the performance can be largely dictated by the serial part (which is also IO bound in your case) (see Amdahl's law - Wikipedia).

In such cases, a better approach could be to compute offsets into the file, and have each process open their own offsets and chunks. But doing so is highly dependent on the file format.

5 Likes

Thanks, that was very helpful. By using a recursive function to generate a chunk and send it off on an async processing step I was able to make full use of the CPU cores and speed things up quite a bit.

In the course of my experimenting I also inadvertently demonstrated the robustness of the BEAM by generating a process for every line of a 50 million line file (not all at once so the process limit was never hit). It wasn’t fast but it didn’t crash.

1 Like

Can you share the relevant parts of your code if that’s okay? I’m curious as to the recursive generation of chunks.

I’m using :prim_file which is not threadsafe and I probably am not wise enough to touch it normally, but in this instance I know nothing else is going to touch the file so it’s fine and apparently faster as it’s basically directly calling the C nif. It reads a big chunk of bytes, checks that it’s hit a line break or end of file, then sends the chunk off on an async Task to process the lines. While awaiting the result it recurses to produce another chunk.

def run_file(filename) do
    workers = 1..@pool_size |> Enum.map(fn w -> String.to_atom("BrcRegistry#{w}") end)

    workers |> Enum.each(fn w -> BrcRegistry.start(w) end)

    {:ok, file} = :prim_file.open(to_string(filename), [:binary, :read, :read_ahead])

    process_file(file, Stream.cycle(workers))

    print_tables(workers)
  end

def process_file(file, worker_queue) do
    worker = Enum.take(worker_queue, 1) |> hd()
    w_q = Stream.drop(worker_queue, 1)

    case :prim_file.read(file, 80_000_000) do
      :eof ->
        :ok

      {:ok, buffer} ->
        buffer =
          case :prim_file.read_line(file) do
            :eof ->
              buffer

            {:ok, line} ->
              <<buffer::binary, line::binary>>
          end

        t = Task.async(fn -> process_lines(buffer, worker) end)

        process_file(file, w_q)
        Task.await(t, :infinity)
    end
  end

  def process_lines("", _worker), do: :ok

  def process_lines(lines, worker) do
    case parse_line(lines) do
      {station, {temp, rest}} ->
        BrcRegistry.register(worker, station, temp)
        process_lines(rest, worker)

      :ok ->
        :ok
    end
  end

  def parse_line(""), do: :ok
  def parse_line(<<"\n", rest::binary>>), do: parse_line(rest)

  def parse_line(chunk) do
    Parse.parse_line(chunk)
  end

I should probably point out that I more or less lifted that whole cloth from 1brc_ex/src/1brc.workers.blob.maps.chunk_to_worker.exs at master · IceDragon200/1brc_ex · GitHub
@icedragon200 has been incredibly generous with their code, hardware, and time.

The whole thing is here if you’re so inclined:

2 Likes