Question regarding improving fizzbuzz IO performance

Hi everyone, posting for the first time here, so please bear with me.

I came across a HN thread that mentioned about a code-golf problem for high-throughput FizzBuzz. Link here: fastest code - High throughput Fizz Buzz - Code Golf Stack Exchange.

The naive single-threaded implementation in C gets about 70 MiB/s throughput on my Mac machine.
Here’s the C program for reference:

#include <stdio.h>

int main() {
    for (int i = 1; i < 1000000000; i++) {
        if ((i % 3 == 0) && (i % 5 == 0)) {
            printf("FizzBuzz\n");
        } else if (i % 3 == 0) {
            printf("Fizz\n");
        } else if (i % 5 == 0) {
            printf("Buzz\n");
        } else {
            printf("%d\n", i);
        }
    }
}

It is my understanding from learning Elixir so far that if your work is CPU intensive, pure Elixir is not a good choice. But since this is a simple problem (and mostly IO oriented as I’ve come to believe), I thought of testing the waters a bit. I came up with the approach of dividing the range 1…1000000000 into #{System.schedulers_online} chunks and using Task.async_stream to process each chunk in its own task.

I made the following Fizzbuzz module:

defmodule Fizzbuzz do
  def fizzbuzz_with_io(enumerable) do
    # I am printing here, because its my understanding it takes time to move data from one process to another.
    Stream.map(enumerable, &reply/1)
    |> Stream.chunk_every(5000)
    |> Enum.into(IO.stream())
  end

  def fizzbuzz_no_io(enumerable) do
    Stream.map(enumerable, &reply/1) |> Stream.run()
  end

  def reply(n) do
    case {rem(n, 3), rem(n, 5)} do
      {0, 0} -> "FizzBuzz\n"
      {0, _} -> "Fizz\n"
      {_, 0} -> "Buzz\n"
      {_, _} -> "#{n}\n"
    end
  end
end

My main CLI driver code (I acknowledge that the below code is not 100% correct and there are some numbers I might be missing in my quest to divide work, but it is OK enough to roughly illustrate the problem):

defmodule Fizzbuzz.Cli do
  def main([lower, upper]) do
    {lower, upper} = {String.to_integer(lower), String.to_integer(upper)}
    chunk_size = div(upper - lower, System.schedulers_online)
    input_enumerable = get_input_ranges(lower, upper, chunk_size)
    IO.inspect(input_enumerable)
    input_enumerable
    |> Task.async_stream(Fizzbuzz, :fizzbuzz, [], timeout: :infinity, max_concurrency: 64)
    |> Stream.run()
  end

  def main(_), do: IO.puts("Usage: fizzbuzz 1 10000")

  defp get_input_ranges(lower, upper, chunk_size) do
    if chunk_size >= 10 do
      if lower >= upper, do: [], else: [lower..min(lower+chunk_size, upper) | get_input_ranges(min(lower+chunk_size, upper) + 1, upper, chunk_size)]
    else
      [lower..upper]
    end
  end
end

Compiling above with MIX_ENV=prod mix escript.build, I have observed the following:

  • If I merely want to do the computation (with no IO at all using :fizzbuzz_no_io), the program finishes in 31 seconds on my machine. Its much faster than 3 minutes and 5 seconds of single threaded execution in Elixir, but its still slower than single-threaded C’s 7 seconds.
  • If I use :fizzbuzz_with_io, the program running time exceeds more than 5 minutes and I get a throughput of 2.5-2.7 MiB/s. Here, C gives a throughput of 70 MiB/s on my machine and a total running time of 1 minute and 43 seconds.

UPDATE-1:

  • I have updated the driver code to handle all cases properly.
  • I have updated fizzbuzz_with_io function to use Stream chunking, which has improved throughput to around 70 MiB/s on my system. That’s 35x improvement in performance. It was just a hunch that drove me to use this, since I thought adding chunking might add small chunks to IO.stream incrementally, rather than overloading it with everything at once. Could somebody please explain it better?
  • There are throughputs that still go in order of _ GiB/s. So, is there any further scope for improvement?

So, here’s my question: How do I improve the throughput of the Elixir version of Fizzbuzz program?
Thanks :smiley:

Should this use ordered: true on Task.async_stream? I’d expect the result should still be printed in order and not just randomly.

Two related gotchas with IO.stream:

  • it’s represented by a “file server” process, so “chunking” into bigger pieces helps by reducing the number of messages sent to that process (chunks vs individual lines)

  • it can still only print one message at a time, no matter how many workers are sending messages

You could likely boost performance by building bigger iolists and sending those to IO in one go.

2 Likes

@LostKobrakai ordered: true is the default for Task.async_stream. From the documentation:

:ordered - whether the results should be returned in the same order as the input stream. When the output is ordered, Elixir may need to buffer results to emit them in the original order. Setting this option to false disables the need to buffer at the cost of removing ordering. This is also useful when you’re using the tasks only for the side effects. Note that regardless of what :ordered is set to, the tasks will process asynchronously. (NOTE: This is what I’m doing in the original fizzbuzz_with_io approach, that’s why the results are unordered) If you need to process elements in order, consider using Enum.map/2 or Enum.each/2 instead. Defaults to true.

If I try to print the results in order, I would have to go with fizzbuzz_no_io approach and turn my driver module into the following:

defmodule Fizzbuzz.Cli do
  def main([lower, upper]) do
    {lower, upper} = {String.to_integer(lower), String.to_integer(upper)}
    chunk_size = min(div(upper - lower, System.schedulers_online), 5000)
    input_enumerable = get_input_ranges(lower, upper, chunk_size)
    IO.inspect(input_enumerable)
    stream = Task.async_stream(input_enumerable, fn range -> Fizzbuzz.fizzbuzz_no_io(range) end, timeout: :infinity)
    Enum.reduce(stream, :ok, fn {:ok, res}, _acc -> res |> Enum.into(IO.stream) end)
  end

  def main(_), do: IO.puts("Usage: fizzbuzz 1 10000")

  defp get_input_ranges(lower, upper, chunk_size) do
    if chunk_size >= 10 do
      if lower >= upper, do: [], else: [lower..min(lower+chunk_size, upper) | get_input_ranges(min(lower+chunk_size, upper) + 1, upper, chunk_size)]
    else
      [lower..upper]
    end
  end
end

Fizzbuzz code:

def fizzbuzz_no_io(enumerable) do
  Stream.map(enumerable, &reply/1)
  |> Stream.chunk_every(6000)
  |> Enum.to_list()
end

Which brings me to my next order of business…

Doing so prints the results in order, but drops the IO speed. @al2o3cr 's answer is a great start for me, but I feel I’ll need to dive deeper into IO internals to see if there’s any scope for improvement.

I’ve been playing with chunk_size in my driver program and the chunk_every values in fizzbuzz_no_io which yield different speed results for different values. Currently, the maximum IO throughput I’ve managed so far is about 40 MiB/s on my machine, purely with hit-and-trial, using a chunk_size of 5000 and chunk_every of 6000.

One last thing that I still don’t understand is that, which seems weird if I think about it - I’m dividing the problem into chunks of size 5000 each, processing each chunk in a Task which will return me a list of size 5000. Why then does removing chunk_every line in this case drop my IO speed to 2.5-3 MiB/s, when my original input coming from Stream.map is of size 5000?

To put it simply, my remaining question is this: Why does chunking a stream of size 5000 into a chunk of size 6000 speed up the IO drastically from 2.5-3 MiB/s to 40 MiB/s?

Thank you for the insightful answers once again. Apologies if I missed something.

I am on my phone, but my blind bet is IOlist concatenation. For smaller lists these are kept as is while for larger ones these are concatenated into large binary blob that is then sent to the OS.

1 Like

Thanks to some learning on this thread and some findings regarding cost of passing data between different processes, I’ve made some further improvements:

  • Using GenServer instead of Task - divide the input into chunks and spin up a GenServer for every chunk, which computes the results for that chunk in parallel.
  • Printing the results by sending a print call to every GenServer in order. No need to move data back to main process unlike a Task, which returns the output to the main process, which is then again passed to IO.puts. Instead we directly call IO.puts from our GenServer, which saves 1 data move and I think that’s the reason of the speedup.

Fizzbuzz.Worker module:

defmodule Fizzbuzz.Worker do
  use GenServer

  def init([range]) do
    send(self(), {:calculate, range})
    {:ok, []}
  end

  def handle_info({:calculate, range}, _state) do
    res = Fizzbuzz.fizzbuzz_no_io(range)
    {:noreply, res}
  end

  def handle_call(:print, _from, results) do
    IO.binwrite(results)
    {:reply, :ok, []}
  end
end

Main module:

defmodule Fizzbuzz.Cli do
  def main([lower, upper]) do
    {lower, upper} = {String.to_integer(lower), String.to_integer(upper)}
    chunk_size = min(div(upper - lower, System.schedulers_online), 6000)
    input_enumerable = get_input_ranges(lower, upper, chunk_size)
    IO.inspect(input_enumerable)
    Task.async_stream(input_enumerable, fn input -> elem(GenServer.start_link(Fizzbuzz.Worker, [input]), 1) end, timeout: :infinity)
    |> Stream.map(fn {:ok, res} -> res end)
    |> Stream.each(
      fn pid ->
        GenServer.call(pid, :print)
        Process.exit(pid, :kill)
      end)
    |> Stream.run()
  end

  def main(_), do: IO.puts("Usage: fizzbuzz 1 10000")

  defp get_input_ranges(lower, upper, chunk_size) do
    if chunk_size >= 10 do
      if lower >= upper, do: [], else: [lower..min(lower+chunk_size, upper) | get_input_ranges(min(lower+chunk_size, upper) + 1, upper, chunk_size)]
    else
      [lower..upper]
    end
  end
end

As a result, I’m getting speeds of 80+ MiB/s on my system, and everything is getting printed in order. It’s not in the order of GiB/s, but I’m happy with my gains for now.

Thanks for all the help once again. Any suggestions/improvements to the above code are welcome.

1 Like

You can likely optimize it further if :print is not a call but a cast (does not require a response).

@dimitarvp I tried that, but I face two problems: 1) I have to wait for all GenServers to finish (I don’t know how to do that properly, so I just added a Process.sleep at end of my driver module for the timebeing). 2) it breaks the order in which numbers are printed.

Oh I must have missed that: is the right printing order a hard requirement?

No worries @dimitarvp, I missed that the first time too. But the problem does mention that the output should be a valid Fizzbuzz output, so I believe the answer is yes. Otherwise I’d happily go all out with the handle_cast approach, since I’m easily getting 110 MiB/s throughput with that. :sweat_smile:

I’m happy to report that I’m now able to achieve 150+ MiB/s throughput on my system by making a small change to Fizzbuzz.reply function:

defp reply(n) do
  case {rem(n, 3), rem(n, 5)} do
    {0, 0} -> "FizzBuzz\n"
    {0, _} -> "Fizz\n"
    {_, 0} -> "Buzz\n"
    {_, _} -> [Integer.to_string(n), "\n"]
  end
end

Two micro optimizations that you can try:

  1. Try using guards instead of case + tuples to avoid tuple allocations.
This one doesn't make sense I realized there's only one proc per range.

You’re sending a message to initiate the calcuation. If you spawned a process that waits to receive a print you can eliminate the scheduling time needed for the self send. Could be splitting hairs but it’d make starting the calculation slightly faster.

def spawner(range) do
  spawn(
    fn ->
      res = Fizzbuzz.fizzbuzz_no_io(range)
      receive do
        {:print, pid} ->
          IO.binwrite(results)
          pid ! :done
      end
    end)
end

Sadly you’ll have to make a poor man’s GenServer.call with plain procs, but since its a microbenchmark it might be worth trying.

  1. Try doubling the number of processes.

Thanks for your answer @mpope. Regarding all the points 1-3:

  1. Try using guards instead of case + tuples to avoid tuple allocations: I tried with the following reply version, but over multiple runs I found out that sometimes its faster by a second from my original function and sometimes it is slower. So, I believe using this one won’t get us any speed improvements:
def reply(n) when rem(n, 15) == 0, do: "FizzBuzz\n"
def reply(n) when rem(n, 3) == 0, do: "Fizz\n"
def reply(n) when rem(n, 5) == 0, do: "Buzz\n"
def reply(n), do: [Integer.to_string(n), "\n"]

Still I’m keeping this, because I think reducing tuple allocations is a good idea. Thanks for this one!

  1. I believe this is similar to using a cast instead of a call as suggested in an earlier comment. And I believe a call is the better option here because when we’re using multiple processes in BEAM, a process can be pre-empted at any instant. Take a case for instance - I have 2 processes - P1 and P2 and I have sent a :print message to P1 and then to P2. It is quite possible that process P1 (which was supposed to print first) got pre-empted at that instant, while P2 was not. As a result, P2 could end up processing :print message first and thus make IO call before P1. Just try changing the call to cast and you’ll observe that the output is no longer being printed in order. Even if both the processes were running on different cores of our CPU, its quite possible that there are speed differences between different cores (ARM’s BIG-LITTLE processors come to mind here).

  2. Try doubling the number of processes. Tried that by doubling max_concurrency in Task.async_stream, getting inconsistent results on repeated runs like number 1, sometimes its marginally faster and sometimes it is marginally slower. So, I believe this won’t get us any improvements either.

Thanks for your help.

1 Like

Back here with another optimization: Setting Fizzbuzz.Worker GenServer’s state to a binary instead of an iolist in handle_info. Ultimately, we’ll be passing a binary to our IO process. But since our binary is quite large (a refc binary), it should reduce message passing overhead. I made this change after going through erlang’s efficiency guide here. According to it:

All data in messages sent between Erlang processes is copied, except for refc binaries and literals on the same Erlang node.

This change improves the throughput to 225 MiB/s.

Can you show the exact code? This sounds intriguing.

Oops, my bad. I meant that I have changed the state of my Fizzbuzz.Worker module from an iolist to a binary. Since the resultant binary is definitely greater than 64 bytes for large enough inputs, it’s safe to say I’m creating a refc binary (and passing that to IO.binwrite when calling print). Here’s the changed module:

defmodule Fizzbuzz.Worker do
  use GenServer

  def init([range]) do
    send(self(), {:calculate, range})
    {:ok, []}
  end

  def handle_info({:calculate, range}, _state) do
    res = Fizzbuzz.fizzbuzz_no_io(range)
    {:noreply, res |> :erlang.iolist_to_binary} # <-- changed the state here
  end

  def handle_call(:print, _from, results) do
    IO.binwrite(results) # <-- refc binary gets passed here.
    {:reply, :ok, []}
  end
end

Passing a large binary instead of an iolist to IO reduces message passing overhead, which improves the overall throughput

1 Like

OK, I got one more trick to get further performance boost on unix-based systems: directly writing to /dev/stdout, which brings the performance to around 500 MiB/s on my MacBook Pro. Its a shame there’s no alternative for Windows. :slightly_frowning_face:

def handle_call(:print, _from, results) do
  # /dev/stdout is UNIX only, so this won't work on windows
  case :os.type() do
    {:unix, _} ->
      {:ok, io_device} = :file.open("/dev/stdout", [:append, :raw])
      :ok = :file.write(io_device, results)
      :file.close(io_device)
    _ -> IO.binwrite(results)
  end
  {:reply, :ok, []}
end

The speed is quite unstable though, ranging between 400-700 MiB/s. But the 5-minute average speed comes around to 500 MiB/s.

Does IO.binwrite send a message to a process? Using raw file access means that no process communication happens, which could be saving time. maybe try this raw stdout access without flattening the binary, because iirc when you set the state in handle_info that isn’t a message pass to itself. I think state transitions are optimized.

2 Likes

What is the reason for having separate process for printing?

I couldn’t run your code, it would be handy to have it as a gist or repo somewhere so it would be easier to test against it, but my approach was to reduce the amount of messages being sent as much as possible, so I just use port directly instead of going through some additional hops. It also avoids congestion on single process that happens in case of IO.binwrite/1 or IO.stream (though it can cause some artifacts, like output being printed out of order):

defmodule FizzBuzz do
  @compile {:inline, fb_fast: 1}

  def main do
    schedulers = System.schedulers_online()

    for i <- 0..schedulers do
      Process.spawn(fn ->
        port = :erlang.open_port({:fd, 0, 1}, [:binary, :out])

        chunk(port, i + 1, schedulers)
      end, priority: :high)
    end

    Process.sleep(:timer.minutes(1))
  end

  defp chunk(port, n, step) do
    send(port, {self(), {:command, fb_fast(n)}})
    chunk(port, n + step, step)
  end

  defp fb_fast(n) do
    for i <- 0..3000 do
    """
    #{n+i}
    #{n+i+1}
    Fizz
    #{n+i+3}
    Buzz
    #{n+i+5}
    #{n+i+6}
    #{n+i+7}
    Fizz
    Buzz
    #{n+i+10}
    Fizz
    #{n+i+12}
    #{n+i+13}
    FizzBuzz
    """
    end
  end
end

FizzBuzz.main()

I haven’t tested it extensively, but it is fast enough IMHO.

1 Like