I want to delay the below code, to run X
times and wait for some time to start next process. As our api end point nginx cannot handle more than X
requests at a time.
def parse(config) do
mapping = config["mapping"]
ets = :ets.new(:"#{config["tenant"]}_order_counter", [:public])
:ok = :hackney_pool.start_pool(
"#{config["tenant"]}_import_pool",
[timeout: 15000, max_connections: 200]
)
config
|> config_parser_and_return_csv_data
|> String.splitter("\n")
|> CSVParser.parse_stream
|> Stream.drop(config["params"]["rows"] |> skip_rows)
|> Stream.filter(&data_row?(&1))
|> Flow.from_enumerable()
|> Flow.filter(&isValidRow?(&1, mapping))
|> Flow.partition(key: &increment_id(&1, mapping))
|> Flow.group_by(&increment_id(&1, mapping))
|> Flow.map(fn group ->
:ets.update_counter(ets, "order_count", {2, 1}, {"order_count", 0})
group
end)
|> Enum.into([])
|> Flow.from_enumerable()
|> Flow.partition
|> Flow.map(fn group ->
[{"order_count", count}] = :ets.lookup(ets, "order_count")
if !config["config"] do
config = Map.put(config, "order_count", count)
end
CSV.Services.Processor.process(group, config)
end)
|> Flow.map(&CSV.Services.DBApi.xml_data_to_DB(&1, config))
|> Flow.run
end
Anyway I can pause the process untill the X
requests are completed, like by adding pool
def xml_data_to_DB(xml_data, config, retry_count \\ 0) do
# IO.puts xml_data
url = "#{config["host_url"]}#{config["import_api_route"]}"
body = {:multipart, [{"xml", xml_data}]}
headers = ["Authorization": "Bearer #{config["auth"]["access_token"]}"]
with {:error, %HTTPoison.Error{reason: reason}} <- HTTPoison.post(url, body, headers, hackney: [pool: :"#{config["tenant"]}_import_pool"]),
:timeout <- reason,
true <- retry_count != 10
do
IO.puts "Waiting 5seconds to retry #{retry_count}"
# IO.inspect xml_data
:timer.sleep(5000)
xml_data_to_DB(xml_data, config, retry_count + 1)
end
end