Seeking code review on how to increase readability, performance, etc

defmodule AsyncProcessor do
  @max_concurrency System.schedulers_online() * 30

  def format(sequence, :flat_map_stream),
    do: Stream.flat_map(sequence, fn {:ok, result} -> result end)

  def format(sequence, :stream), do: Stream.map(sequence, fn {:ok, result} -> result end)

  def format(sequence, :flat_map),
    do: Enum.reduce(sequence, [], fn {:ok, [_ | _] = result}, acc -> result ++ acc end)

  def format(sequence, :enum_map), do: Enum.map(sequence, fn {:ok, result} -> result end)

  def execute(enumerable, lambda, output_config \\ :enum_map, timeout \\ 20_000) do
    Task.async_stream(enumerable, &lambda.(&1),
      max_concurrency: @max_concurrency,
      timeout: timeout
    )
    |> format(output_config)
  end
end
defmodule Print do
  @reset IO.ANSI.reset()
  @blue IO.ANSI.color(33)
  @fuscia IO.ANSI.color(161)
  @light_green IO.ANSI.color(44)
  @mid_green IO.ANSI.color(76)
  @yellow IO.ANSI.color(226)

  def utc_time, do: DateTime.utc_now()

  def text(message) do
    timestamp = @blue <> "#{utc_time()} UTC-" <> @mid_green

    IO.puts("#{timestamp} #{message} \n")
  end

  def text(details, module, line_number) do
    timestamp = @blue <> "#{utc_time()} UTC-" <> @mid_green

    IO.puts("#{timestamp} #{module} - line #{line_number}: #{details} \n")
  end

  def error(details, module, line_number) do
    timestamp = @blue <> "#{utc_time()} UTC-"
    formatted_details = @fuscia <> "#{details}" <> @reset

    IO.puts("#{timestamp} #{module} - line #{line_number}: #{formatted_details} \n")
  end

  def warning(details, module, line_number) do
    timestamp = @blue <> "#{utc_time()} UTC-"
    warning_message = @yellow <> "#{details}" <> @reset

    IO.puts("#{timestamp} #{module} - line #{line_number}: #{warning_message} \n")
  end

  def highlight(details, module, line_number) do
    timestamp = @blue <> "#{utc_time()} UTC-" <> @light_green

    IO.puts("#{timestamp} #{module} - line #{line_number}: #{details} \n")
  end

  def highlight(message) do
    timestamp = @blue <> "#{utc_time()} UTC-"
    text = @light_green <> "#{message}"

    IO.puts("#{timestamp} #{text} \n")
  end
end
defmodule LogBook do
  require Logger

  def write_to_log(data, module, line_number) do
    data_eval = inspect(data, limit: :infinity)
    Logger.info("#{module} - line #{line_number}: #{data_eval}")

    {:ok, data_eval}
  end

  def write_to_console(:quiet),
    do: {:ok, :message_logged}

  def write_to_console(_, _, _, :quiet),
    do: write_to_console(:quiet)

  def write_to_console(data_eval, module, line_number, :error),
    do: Print.error(data_eval, module, line_number)

  def write_to_console(data_eval, module, line_number, :warning),
    do: Print.warning(data_eval, module, line_number)

  def write_to_console(data_eval, module, line_number, message_type)
      when message_type == :trace or message_type == nil,
      do: Print.highlight(data_eval, module, line_number)

  def write_to_console(_, :quiet),
    do: write_to_console(:quiet)

  def write_to_console(message, :print_to_screen),
    do: Print.text(message)

  def write_to_console(message, :highlight),
    do: Print.highlight(message)

  def main(data, module, line_number, message_type \\ :trace, list? \\ false)

  def main(message, module, line_number, message_type, false)
      when message_type == :print_to_screen or message_type == :highlight do
    with {:ok, _} <- write_to_log(message, module, line_number) do
      write_to_console(message, message_type)
    else
      glitch ->
        raise "#{__MODULE__}: Mishandled value: #{inspect(glitch)}"
    end
  end

  def main(data, module, line_number, message_type, false) do
    with {:ok, data_eval} <- write_to_log(data, module, line_number) do
      write_to_console(data_eval, module, line_number, message_type)
    else
      glitch ->
        raise "#{__MODULE__}: Mishandled value: #{inspect(glitch)}"
    end
  end

  def main([_ | _] = data, module, line_number, message_type, true) do
    with {:ok, _data_eval} <- write_to_log(data, module, line_number) do
      AsyncProcessor.execute(
        data,
        &(inspect(&1, limit: :infinity)
          |> write_to_console(module, line_number, message_type))
      )
    else
      glitch ->
        raise "#{__MODULE__}: Mishandled value: #{inspect(glitch)}"
    end
  end

  def testing(data, module, line_number),
    do: main(data, module, line_number)

  def testing(data, module, line_number, opts) do
    list? = opts[:list] || false
    message_type = opts[:message_type] || :trace
    main(data, module, line_number, message_type, list?)
  end
end
defmodule Ventris do
  def update_keys_list(keys_list, acc) when is_list(acc) do
    case is_list(keys_list) do
      true -> keys_list ++ acc
      false -> [keys_list] ++ acc
    end
  end

  def extract_nested_keys(nested_api_data, root_key) when is_map(nested_api_data) do
    Map.keys(nested_api_data)
    |> AsyncProcessor.execute(
      fn branch_key ->
        lower_level_api_data = Map.get(nested_api_data, branch_key)
        updated_keys_list = update_keys_list(root_key, [branch_key])

        extract_nested_keys(lower_level_api_data, updated_keys_list)
        |> List.flatten()
      end,
      :flat_map
    )
  end

  def extract_nested_keys(nested_api_data, key) when not is_map(nested_api_data), do: key

  def extract_keys(api_response_json, target_data_key) when is_atom(target_data_key) do
    {:ok, target_data} = extract_target_data(api_response_json, target_data_key)
    {:ok, decoded_api_response_json} = Jason.decode(target_data)

    Map.keys(decoded_api_response_json)
    |> AsyncProcessor.execute(
      fn key ->
        nested_keys_list =
          Map.get(decoded_api_response_json, key)
          |> extract_nested_keys(key)

        update_keys_list(nested_keys_list, [])
      end,
      :flat_map
    )
    |> Enum.uniq()
    |> AsyncProcessor.execute(&String.to_atom(&1))
  end

  def decypher(raw_json, target_data, api_url, target_data_key) do
    try do
      Jason.decode(target_data, keys: :atoms!)
    rescue
      _ ->
        LogBook.main("Lack of atomized versions of API's keys for #{api_url} prevents the safe conversion of this JSON.  Creating missing key versions now . . . ", __MODULE__, 49, :warning)
        extract_keys(raw_json, target_data_key)

        LogBook.main(
          "JSON key atomization complete. Getting fresh data from #{api_url} . . .",
          __MODULE__,
          55,
          :warning
        )

        main(api_url, target_data_key)
    end
  end

  def extract_target_data(api_response_json, target_data_key) when is_atom(target_data_key) do
    target_data =
      case target_data_key do
        :none -> api_response_json
        _ -> Map.get(api_response_json, target_data_key)
      end

    {:ok, target_data}
  end

  def update_log(api_url, error_message, {sleep_time, interval}, line_no) do
    LogBook.main(
      "Error: #{inspect(error_message)} - Retrying #{api_url} in #{sleep_time} #{interval} . . . ",
      __MODULE__,
      line_no,
      :warning
    )
  end

  def retry_api_url(api_url, error_message, attempt_count) do
    cond do
      attempt_count == 1 ->
        update_log(api_url, error_message, {200, "ms"}, 14)

        Process.sleep(200)
        main(api_url, attempt_count)

      attempt_count == 2 ->
        update_log(api_url, error_message, {500, "ms"}, 20)

        Process.sleep(500)
        main(api_url, attempt_count)

      attempt_count <= 29 ->
        update_log(api_url, error_message, {1.5, "seconds"}, 26)

        Process.sleep(1_500)
        main(api_url, attempt_count)

      attempt_count > 29 ->
        raise "Error after #{attempt_count} failed attempts: #{inspect(error_message)}"
    end
  end

  def process_api_response(
        {:error, %HTTPoison.Error{id: nil, reason: _} = error_message},
        api_url,
        attempt_count
      ),
      do: retry_api_url(api_url, error_message, attempt_count + 1)

  def process_api_response({:ok, response}, _, _), do: {:ok, response}

  def connect_to_api(api_url, attempt_count \\ 0) do
    response = HTTPoison.get(api_url)
    process_api_response(response, api_url, attempt_count)
  end

  def fetch_data(api_url) do
    case connect_to_api(api_url) do
      {:ok, _} = good_response ->
        good_response

      glitch ->
        raise "#{__MODULE__}: Mishandled value: #{inspect(glitch)}"
    end
  end

  def main(api_url, target_data_key) do
    with {:ok, api_response_json} <- fetch_data(api_url),
         {:ok, target_data} <- extract_target_data(api_response_json, target_data_key) do
      decypher(api_response_json, target_data, api_url, target_data_key)
    end
  end
end

Any particular advice on how to increase readability, efficiency, execution speed, conformance to best practices, etc. would be greatly appreciated.

1 Like

Hey @Maxximiliann! This is a really broad request with very little context, could you elaborate further?

It would be helpful to give a little more context to your code snippet, like what purpose does this code serve and what are you trying to accomplish with it… It would also be very nice if you could describe what have tried so far and if there are any particular parts you are seeking to improve.

This is the real combo, isn’t it :sweat_smile:!? Jokes aside, It’s very hard to answer an open-ended question like that. So if possible, it would be better to be a little bit more specific.

2 Likes

Functionality observations:

  • the :flat_map head will crash if any task returns {:ok, []}.
  • the :flat_map head returns results reverse order compared to the others
  • some code paths through execute return a list, others return a yet-to-be-evaluated Stream

IMO this design adds a lot of complexity to try to cover every use case in one function. For instance, what would the @spec for execute’s lambda’s possible types look like?

Consider splitting execute into multiple functions that each expect a callback with a specific shape.


write_to_log will always return {:ok, _}, so this entire with statement is useless.


There is significant overhead involved in Task.async_stream, especially compared to literally fetching a single atom from the atom table. This is not an efficient way to do things.

Furthermore, the only things you should be doing in Task.async_stream with @max_concurrency System.schedulers_online() * 30 are wait-heavy IO operation, since there are so many more processes than available CPUs.


  def write_to_log(data, module, line_number) do
    data_eval = inspect(data, limit: :infinity)
    Logger.info("#{module} - line #{line_number}: #{data_eval}")

    {:ok, data_eval}
  end

This function wears two hats at once: it formats the data with inspect and it logs it. Consider splitting those responsibilities.


IMO functions that dispatch on an argument (like format above, or write_to_console) are fine, but it becomes a design smell when callers of those functions are also special-casing those arguments. Refactoring can tidy this up, for instance by aligning the structures and keeping the behavior to a single function. For instance:

defmodule LogBook do
  require Logger

  # NOTE: as suggested above, splits write_to_log into separate parts
  def format_log(data, message_type) 
      when message_type == :print_to_screen or message_type == :highlight, do: data

  def format_log(data, _), do: inspect(data, limit: :infinity)

  def write_to_log(message, module, line_number) do
    Logger.info("#{module} - line #{line_number}: #{data_eval}")
  end

  # NOTE: write_to_console is now consistently arity 4
  def write_to_console(_, _, _, :quiet),
    do: {:ok, :message_logged}

  def write_to_console(data_eval, module, line_number, :error),
    do: Print.error(data_eval, module, line_number)

  def write_to_console(data_eval, module, line_number, :warning),
    do: Print.warning(data_eval, module, line_number)

  def write_to_console(data_eval, module, line_number, message_type)
      when message_type == :trace or message_type == nil,
      do: Print.highlight(data_eval, module, line_number)

  def write_to_console(message, _module, _line_number, :print_to_screen),
    do: Print.text(message)

  def write_to_console(message, _module, _line_number, :highlight),
    do: Print.highlight(message)

  def main(data, module, line_number, message_type \\ :trace, list? \\ false)

  # NOTE: no more branching on message_type here!
  # NOTE: removed useless with statements
  def main(data, module, line_number, message_type, false) do
    message = format_log(data, message_type)
    write_to_log(message, module, line_number)
    write_to_console(message, module, line_number, message_type)
  end

  # NOTE: removed async executor because
  #    * inspect is fast
  #    * writing to console involves every process trying to lock the same resource (the console)
  #
  def main([_ | _] = data, module, line_number, message_type, true) do
    message = format_log(data, message_type)
    write_to_log(message, module, line_number)
    Enum.map(data, fn el ->
      el
      |> format_log(message_type)
      |> write_to_console(module, line_number, message_type)
    end)
  end
10 Likes

First of all, I want to thank you so much for all your astute observations; this is exactly the kind of feedback I was searching for. I’ve learned a lot from them! :slight_smile:

Please elaborate on what you mean exactly by “overhead,” if you kindly would.

Thank you for this insight. How exactly can I tell whether an operation is IO wait-heavy or not?

1 Like

For the record I can’t exactly agree with @al2o3cr here because parallelizing work (one that’s inherently parallel, of course) still yields huge wins compared to trying to do single-thread async switching. I am not aware of the significant overhead he’s talking about but on a higher level I never regretted parallelizing things with Elixir. Of course some – perhaps not obvious for everyone – limitations must apply e.g. if you have some mere 10_000 items to process where each item does not interact with network or disk then doing this in parallel is highly likely to be not worth it. Which brings me to…

Modern CPUs spend most of their lives waiting on network or disk. If your parallel tasks involve I/O (so network or disks or any slow periphery) then they are a perfect candidate for high parallelization (meaning parallel task amount that’s bigger than your CPU threads).

…Although that has limitations as well e.g. it’s not very useful to spawn 100 tasks that all read/write from/to the same slow hard drive. As always, you should measure and take these factors into account.

@al2o3cr’s remark is valid as long as your tasks are actually CPU-bound; if they are then having more than CPU threads parallel tasks (System.schedulers_online() is usually the same number) will not achieve any performance improvement. He’s right about that.

The general rule of parallelization is:

  • If your tasks are CPU-bound (i.e. don’t depend on any I/O which is orders of magnitude slower than the CPU) then don’t exceed the amount of CPU threads as your maximum concurrency.
  • If your tasks are I/O-bound then you can multiply the CPU threads count by any single or two digit number. Here however you’ll have to measure and you could also be a bit more liberal with the risk of overloading whatever remote service or local disk you are working with because your hosted app will likely reside on servers that are stronger than your laptop.
2 Likes

Thanks so much for your keen insights. Could you kindly elaborate on how one goes about measuring for this?

1 Like

That’s going to set @max_concurrency to the number of processors of the computer that compiled the code, not the number of the processors of the computer that the application is run on.

See compile time vs runtime config.

3 Likes

“Overhead” as in “starting Tasks and awaiting the results isn’t free”. A quick benchmark:

Mix.install([:benchee])

list = Enum.to_list(1..10_000)
map_fun = fn i -> i+1 end

Benchee.run(
  %{
    "map" => fn -> Enum.map(list, map_fun) end,
    "stream_map" => fn -> list |> Stream.map(map_fun) |> Enum.to_list() end,
    "async_stream" => fn -> list |> Task.async_stream(map_fun) |> Enum.to_list() end,
    "too_many_async_stream" => fn -> list |> Task.async_stream(map_fun, max_concurrency: 1000) |> Enum.to_list() end
  },
  time: 10,
  memory_time: 2
)

gives results like:

Operating System: macOS
CPU Information: Intel(R) Core(TM) i7-6820HQ CPU @ 2.70GHz
Number of Available Cores: 8
Available memory: 16 GB
Elixir 1.13.4
Erlang 25.0

Benchmark suite executing with the following configuration:
warmup: 2 s
time: 10 s
memory time: 2 s
reduction time: 0 ns
parallel: 1
inputs: none specified
Estimated total run time: 56 s

Benchmarking async_stream ...
Benchmarking map ...
Benchmarking stream_map ...
Benchmarking too_many_async_stream ...

Name                            ips        average  deviation         median         99th %
stream_map                   244.05        4.10 ms     ±9.88%        4.06 ms        5.17 ms
map                          239.78        4.17 ms    ±11.39%        4.05 ms        5.48 ms
async_stream                   9.46      105.76 ms     ±6.72%      104.05 ms      128.99 ms
too_many_async_stream          3.43      291.42 ms    ±14.69%      276.23 ms      409.43 ms

Comparison: 
stream_map                   244.05
map                          239.78 - 1.02x slower +0.0730 ms
async_stream                   9.46 - 25.81x slower +101.66 ms
too_many_async_stream          3.43 - 71.12x slower +287.32 ms

Memory usage statistics:

Name                          average  deviation         median         99th %
stream_map                    4.73 MB     ±0.00%        4.73 MB        4.73 MB
map                           4.12 MB     ±0.00%        4.12 MB        4.12 MB
async_stream                 12.72 MB     ±0.15%       12.73 MB       12.73 MB
too_many_async_stream        17.58 MB     ±0.40%       17.61 MB       17.63 MB

Comparison: 
stream_map                    4.73 MB
map                           4.12 MB - 0.87x memory usage -0.61307 MB
async_stream                 12.72 MB - 2.69x memory usage +7.99 MB
too_many_async_stream        17.58 MB - 3.71x memory usage +12.84 MB

Using async_stream is 25x slower than map, and it gets EVEN SLOWER (70x map!) when trying to use too much concurrency. This is particularly notable here because map_fun does very very little, so all of this is overhead.

Slowing map_fun down by adding a Process.sleep(2) changes the situation:

Operating System: macOS
CPU Information: Intel(R) Core(TM) i7-6820HQ CPU @ 2.70GHz
Number of Available Cores: 8
Available memory: 16 GB
Elixir 1.13.4
Erlang 25.0

Benchmark suite executing with the following configuration:
warmup: 2 s
time: 10 s
memory time: 2 s
reduction time: 0 ns
parallel: 1
inputs: none specified
Estimated total run time: 56 s

Benchmarking async_stream ...
Benchmarking map ...
Benchmarking stream_map ...
Benchmarking too_many_async_stream ...

Name                            ips        average  deviation         median         99th %
too_many_async_stream          1.93         0.52 s     ±8.79%         0.51 s         0.63 s
async_stream                   0.27         3.75 s     ±0.04%         3.75 s         3.75 s
stream_map                   0.0332        30.09 s     ±0.00%        30.09 s        30.09 s
map                          0.0332        30.15 s     ±0.00%        30.15 s        30.15 s

Comparison: 
too_many_async_stream          1.93
async_stream                   0.27 - 7.24x slower +3.23 s
stream_map                   0.0332 - 58.08x slower +29.57 s
map                          0.0332 - 58.19x slower +29.63 s

Memory usage statistics:

Name                          average  deviation         median         99th %
too_many_async_stream        17.75 MB     ±0.41%       17.75 MB       17.83 MB
async_stream                 12.80 MB     ±0.00%       12.80 MB       12.80 MB
stream_map                    9.43 MB     ±0.00%        9.43 MB        9.43 MB
map                           8.77 MB     ±0.00%        8.77 MB        8.77 MB

Comparison: 
too_many_async_stream        17.75 MB
async_stream                 12.80 MB - 0.72x memory usage -4.95177 MB
stream_map                    9.43 MB - 0.53x memory usage -8.31817 MB
map                           8.77 MB - 0.49x memory usage -8.97483 MB

Now every invocation of map_fun takes a lot more “wall-clock time” than before, but approximately the same amount of “running on the CPU” time since it’s mostly asleep.

Because of that, async_stream is faster (by about a factor of System.schedulers_online) - and too_many_async_stream is even faster, though with diminishing returns after max_concurrency of about 64 or so.

8 Likes

Yep, parallelization should only be utilized when the tasks are either not trivial (take some CPU time) or just involve some waiting (I/O). In all other scenarios and with most list sizes you’re better off just doing Enum.map over the list.

2 Likes