defmodule AsyncProcessor do
@max_concurrency System.schedulers_online() * 30
def format(sequence, :flat_map_stream),
do: Stream.flat_map(sequence, fn {:ok, result} -> result end)
def format(sequence, :stream), do: Stream.map(sequence, fn {:ok, result} -> result end)
def format(sequence, :flat_map),
do: Enum.reduce(sequence, [], fn {:ok, [_ | _] = result}, acc -> result ++ acc end)
def format(sequence, :enum_map), do: Enum.map(sequence, fn {:ok, result} -> result end)
def execute(enumerable, lambda, output_config \\ :enum_map, timeout \\ 20_000) do
Task.async_stream(enumerable, &lambda.(&1),
max_concurrency: @max_concurrency,
timeout: timeout
)
|> format(output_config)
end
end
defmodule Print do
@reset IO.ANSI.reset()
@blue IO.ANSI.color(33)
@fuscia IO.ANSI.color(161)
@light_green IO.ANSI.color(44)
@mid_green IO.ANSI.color(76)
@yellow IO.ANSI.color(226)
def utc_time, do: DateTime.utc_now()
def text(message) do
timestamp = @blue <> "#{utc_time()} UTC-" <> @mid_green
IO.puts("#{timestamp} #{message} \n")
end
def text(details, module, line_number) do
timestamp = @blue <> "#{utc_time()} UTC-" <> @mid_green
IO.puts("#{timestamp} #{module} - line #{line_number}: #{details} \n")
end
def error(details, module, line_number) do
timestamp = @blue <> "#{utc_time()} UTC-"
formatted_details = @fuscia <> "#{details}" <> @reset
IO.puts("#{timestamp} #{module} - line #{line_number}: #{formatted_details} \n")
end
def warning(details, module, line_number) do
timestamp = @blue <> "#{utc_time()} UTC-"
warning_message = @yellow <> "#{details}" <> @reset
IO.puts("#{timestamp} #{module} - line #{line_number}: #{warning_message} \n")
end
def highlight(details, module, line_number) do
timestamp = @blue <> "#{utc_time()} UTC-" <> @light_green
IO.puts("#{timestamp} #{module} - line #{line_number}: #{details} \n")
end
def highlight(message) do
timestamp = @blue <> "#{utc_time()} UTC-"
text = @light_green <> "#{message}"
IO.puts("#{timestamp} #{text} \n")
end
end
defmodule LogBook do
require Logger
def write_to_log(data, module, line_number) do
data_eval = inspect(data, limit: :infinity)
Logger.info("#{module} - line #{line_number}: #{data_eval}")
{:ok, data_eval}
end
def write_to_console(:quiet),
do: {:ok, :message_logged}
def write_to_console(_, _, _, :quiet),
do: write_to_console(:quiet)
def write_to_console(data_eval, module, line_number, :error),
do: Print.error(data_eval, module, line_number)
def write_to_console(data_eval, module, line_number, :warning),
do: Print.warning(data_eval, module, line_number)
def write_to_console(data_eval, module, line_number, message_type)
when message_type == :trace or message_type == nil,
do: Print.highlight(data_eval, module, line_number)
def write_to_console(_, :quiet),
do: write_to_console(:quiet)
def write_to_console(message, :print_to_screen),
do: Print.text(message)
def write_to_console(message, :highlight),
do: Print.highlight(message)
def main(data, module, line_number, message_type \\ :trace, list? \\ false)
def main(message, module, line_number, message_type, false)
when message_type == :print_to_screen or message_type == :highlight do
with {:ok, _} <- write_to_log(message, module, line_number) do
write_to_console(message, message_type)
else
glitch ->
raise "#{__MODULE__}: Mishandled value: #{inspect(glitch)}"
end
end
def main(data, module, line_number, message_type, false) do
with {:ok, data_eval} <- write_to_log(data, module, line_number) do
write_to_console(data_eval, module, line_number, message_type)
else
glitch ->
raise "#{__MODULE__}: Mishandled value: #{inspect(glitch)}"
end
end
def main([_ | _] = data, module, line_number, message_type, true) do
with {:ok, _data_eval} <- write_to_log(data, module, line_number) do
AsyncProcessor.execute(
data,
&(inspect(&1, limit: :infinity)
|> write_to_console(module, line_number, message_type))
)
else
glitch ->
raise "#{__MODULE__}: Mishandled value: #{inspect(glitch)}"
end
end
def testing(data, module, line_number),
do: main(data, module, line_number)
def testing(data, module, line_number, opts) do
list? = opts[:list] || false
message_type = opts[:message_type] || :trace
main(data, module, line_number, message_type, list?)
end
end
defmodule Ventris do
def update_keys_list(keys_list, acc) when is_list(acc) do
case is_list(keys_list) do
true -> keys_list ++ acc
false -> [keys_list] ++ acc
end
end
def extract_nested_keys(nested_api_data, root_key) when is_map(nested_api_data) do
Map.keys(nested_api_data)
|> AsyncProcessor.execute(
fn branch_key ->
lower_level_api_data = Map.get(nested_api_data, branch_key)
updated_keys_list = update_keys_list(root_key, [branch_key])
extract_nested_keys(lower_level_api_data, updated_keys_list)
|> List.flatten()
end,
:flat_map
)
end
def extract_nested_keys(nested_api_data, key) when not is_map(nested_api_data), do: key
def extract_keys(api_response_json, target_data_key) when is_atom(target_data_key) do
{:ok, target_data} = extract_target_data(api_response_json, target_data_key)
{:ok, decoded_api_response_json} = Jason.decode(target_data)
Map.keys(decoded_api_response_json)
|> AsyncProcessor.execute(
fn key ->
nested_keys_list =
Map.get(decoded_api_response_json, key)
|> extract_nested_keys(key)
update_keys_list(nested_keys_list, [])
end,
:flat_map
)
|> Enum.uniq()
|> AsyncProcessor.execute(&String.to_atom(&1))
end
def decypher(raw_json, target_data, api_url, target_data_key) do
try do
Jason.decode(target_data, keys: :atoms!)
rescue
_ ->
LogBook.main("Lack of atomized versions of API's keys for #{api_url} prevents the safe conversion of this JSON. Creating missing key versions now . . . ", __MODULE__, 49, :warning)
extract_keys(raw_json, target_data_key)
LogBook.main(
"JSON key atomization complete. Getting fresh data from #{api_url} . . .",
__MODULE__,
55,
:warning
)
main(api_url, target_data_key)
end
end
def extract_target_data(api_response_json, target_data_key) when is_atom(target_data_key) do
target_data =
case target_data_key do
:none -> api_response_json
_ -> Map.get(api_response_json, target_data_key)
end
{:ok, target_data}
end
def update_log(api_url, error_message, {sleep_time, interval}, line_no) do
LogBook.main(
"Error: #{inspect(error_message)} - Retrying #{api_url} in #{sleep_time} #{interval} . . . ",
__MODULE__,
line_no,
:warning
)
end
def retry_api_url(api_url, error_message, attempt_count) do
cond do
attempt_count == 1 ->
update_log(api_url, error_message, {200, "ms"}, 14)
Process.sleep(200)
main(api_url, attempt_count)
attempt_count == 2 ->
update_log(api_url, error_message, {500, "ms"}, 20)
Process.sleep(500)
main(api_url, attempt_count)
attempt_count <= 29 ->
update_log(api_url, error_message, {1.5, "seconds"}, 26)
Process.sleep(1_500)
main(api_url, attempt_count)
attempt_count > 29 ->
raise "Error after #{attempt_count} failed attempts: #{inspect(error_message)}"
end
end
def process_api_response(
{:error, %HTTPoison.Error{id: nil, reason: _} = error_message},
api_url,
attempt_count
),
do: retry_api_url(api_url, error_message, attempt_count + 1)
def process_api_response({:ok, response}, _, _), do: {:ok, response}
def connect_to_api(api_url, attempt_count \\ 0) do
response = HTTPoison.get(api_url)
process_api_response(response, api_url, attempt_count)
end
def fetch_data(api_url) do
case connect_to_api(api_url) do
{:ok, _} = good_response ->
good_response
glitch ->
raise "#{__MODULE__}: Mishandled value: #{inspect(glitch)}"
end
end
def main(api_url, target_data_key) do
with {:ok, api_response_json} <- fetch_data(api_url),
{:ok, target_data} <- extract_target_data(api_response_json, target_data_key) do
decypher(api_response_json, target_data, api_url, target_data_key)
end
end
end
Any particular advice on how to increase readability, efficiency, execution speed, conformance to best practices, etc. would be greatly appreciated.