FLOW - How to Enhance performance from a series of flows?

I’m migrating some functions from parallel_stream to flow.
This function loads a xlsx file and generate data to import.
The ideia is to increase performance overall. So far so good, specially with files over 50k lines.
In some tests, went from 106 seconds (parallel_stream) to 62 seconds (flow).
Any tips to increase performance or do it in a few flows?

flow pipeline

  def flow_it(path, header, accepted, id) do
    final_header =
      header_filter(header, accepted)
    final_id =
      id
      |> String.to_integer()
    {_status, _header, lines} =
      path
      |> Sheet.load_all()
    lines
    |> flow_1(final_header)
    |> flow_2()
    |> flow_3(final_id)
  end

reduce

  def flow_1(enum, final_header) do
    enum
    |> Flow.from_enumerable()
    |> Flow.partition()
    |> Flow.reduce(fn -> [] end, fn {_x, y}, acc ->
      temp =
        final_header
        |> Enum.reduce(%{}, fn {a, b}, acc ->
          %{
            b => y |> Map.get(a)
          }
          |> Map.merge(acc)
        end)
      [temp]
      |> Enum.concat(acc)
    end)
    |> Enum.to_list()
  end

only unique rows

  def flow_2(enum) do
    enum
    |> Flow.from_enumerable()
    |> Flow.partition()
    |> Flow.uniq_by(fn x ->
      x["email"]
    end)
    |> Enum.to_list()
  end

some cleaning and calculation

  def flow_3(enum, final_id) do
    enum
    |> Flow.from_enumerable()
    |> Flow.partition()
    |> Flow.reduce(fn -> [] end, fn x, acc ->
      check_email =
        x["email"]
        |> email_fix()
      temp =
        case is_nil(check_email) do
          true ->
            nil
          false ->
            data =
              x
              |> Map.pop("email")
              |> elem(1)
            domain =
              check_email
              |> domain_from_email()
            final_data =
              data
              |> Map.merge(%{"domain" => domain})
              |> Sheet.attrs_validation(["country", "phone"])
            %{
              email: check_email,
              enabled: true,
              list_id: final_id,
              data: final_data
            }
        end
      [temp]
      |> Enum.concat(acc)
    end)
    |> Enum.to_list()
  end
2 Likes

Check this out: http://teamon.eu/2016/tuning-elixir-genstage-flow-pipeline-processing/

The graph stuff he explains here http://teamon.eu/2016/measuring-visualizing-genstage-flow-with-gnuplot/ could help you visualize and tweak your setup (number of stages, demand size, etc.).

1 Like

Thanks.

After some experimentation, performance increased (and other thing emerged, like uniq_by partition != uniq_by total).

My current single fun is:

  def flow_2_1(enum, final_header, final_id, list) do
    {max_demand,stages} =
      {100, 50}
    now =
      Timex.now()
      |> Timex.to_naive_datetime()
      |> NaiveDateTime.truncate(:second)
    enum
    |> Flow.from_enumerable(max_demand: max_demand)
    |> Flow.partition(max_demand: max_demand, stages: stages)
    |> Flow.reduce(fn -> [] end, fn {_x, y}, acc ->
      temp =
        final_header
        |> Enum.reduce(%{}, fn {a, b}, acc ->
          %{
            b => y |> Map.get(a)
          }
          |> Map.merge(acc)
        end)
      [temp]
      |> Enum.concat(acc)
    end)
    |> Flow.partition(max_demand: max_demand, stages: stages)
    |> Flow.uniq_by(fn x ->
      x["email"]
    end)
    |> Flow.partition(max_demand: max_demand, stages: stages)
    |> Flow.filter(fn x ->
      x["email"]
      |> is_nil()
      |> Kernel.not()
    end)
    |> Flow.partition(max_demand: max_demand, stages: stages)
    |> Flow.reduce(fn -> [] end, fn x, acc ->
      check_email =
        x["email"]
        |> email_fix()
      temp =
        case is_nil(check_email) do
          true ->
            nil
          false ->
            data =
              x
              |> Map.pop("email")
              |> elem(1)
            domain =
              check_email
              |> domain_from_email()
            temp_data =
              data
              |> Map.merge(%{"domain" => domain})
              |> Sheet.attrs_validation(["country", "phone"])
            country_data =
              temp_data
              |> check_country(list)
            final_data =
              temp_data
              |> Map.merge(%{"country" => country_data})
            %{
              email: check_email,
              enabled: true,
              list_id: final_id,
              data: final_data,
              inserted_at: now,
              updated_at: now
            }
        end
      [temp]
      |> Enum.concat(acc)
    end)
    |> Enum.to_list()
    |> Enum.uniq_by(fn x ->
      x.email
    end)
  end