I’m migrating some functions from parallel_stream to flow.
This function loads a xlsx file and generate data to import.
The ideia is to increase performance overall. So far so good, specially with files over 50k lines.
In some tests, went from 106 seconds (parallel_stream) to 62 seconds (flow).
Any tips to increase performance or do it in a few flows?
flow pipeline
def flow_it(path, header, accepted, id) do
final_header =
header_filter(header, accepted)
final_id =
id
|> String.to_integer()
{_status, _header, lines} =
path
|> Sheet.load_all()
lines
|> flow_1(final_header)
|> flow_2()
|> flow_3(final_id)
end
reduce
def flow_1(enum, final_header) do
enum
|> Flow.from_enumerable()
|> Flow.partition()
|> Flow.reduce(fn -> [] end, fn {_x, y}, acc ->
temp =
final_header
|> Enum.reduce(%{}, fn {a, b}, acc ->
%{
b => y |> Map.get(a)
}
|> Map.merge(acc)
end)
[temp]
|> Enum.concat(acc)
end)
|> Enum.to_list()
end
only unique rows
def flow_2(enum) do
enum
|> Flow.from_enumerable()
|> Flow.partition()
|> Flow.uniq_by(fn x ->
x["email"]
end)
|> Enum.to_list()
end
some cleaning and calculation
def flow_3(enum, final_id) do
enum
|> Flow.from_enumerable()
|> Flow.partition()
|> Flow.reduce(fn -> [] end, fn x, acc ->
check_email =
x["email"]
|> email_fix()
temp =
case is_nil(check_email) do
true ->
nil
false ->
data =
x
|> Map.pop("email")
|> elem(1)
domain =
check_email
|> domain_from_email()
final_data =
data
|> Map.merge(%{"domain" => domain})
|> Sheet.attrs_validation(["country", "phone"])
%{
email: check_email,
enabled: true,
list_id: final_id,
data: final_data
}
end
[temp]
|> Enum.concat(acc)
end)
|> Enum.to_list()
end