Code review request

Hi all

Look at the code snippet

  def handle_call({:GET, bUri, bUser, bPassword, mHttpHeaders}, _from, state) do
    {:reply, request(:GET, state <> bUri, bUser, bPassword, mHttpHeaders), state}
  end

  def handle_call({:GET, bUri, bUser, bPassword}, _from, state) do
    {:reply, request(:GET, state <> bUri, bUser, bPassword), state}
  end

  # Public functions
  def request(:GET, bUri, bUser, bPassword, mHttpHeaders \\ %{}) when is_binary(bUri)
                                                                 and is_binary(bUser)
                                                                 and is_binary(bPassword)
                                                                 do
    lAuth = [basic_auth: {bUser, bPassword}]
    mHeaders = Map.merge(mHttpHeaders, %{"Accept" => "application/json"})
    HTTPoison.get(bUri, mHeaders, [hackney: lAuth])
  end

and please let me know, if can I improve something.

Thanks

Is state your base URL? It seems strange that you would use state from a GenServer for this purpose. I’m not sure what value GenServer adds here.

I use GenServer in conjunction with calls using HTTPoison, but I use the GenServer to manage the status of Tasks which make the HTTPoison requests. That way if the HTTPoison call fails, it doesn’t take down the GenServer and I can keep track of the failures. You can make it generic by passing in an MFA, or specific to HTTPoison by just passing in arguments.

{:ok, sup_pid} = Task.Supervisor.start_link # Start a Task Supervisor

use GenServer

# Client API
def start_link, do: GenServer.start_link(__MODULE__, :ok, name: __MODULE)

def submit_request(uri, user, password), do: GenServer.cast(__MODULE__, {uri, user, password})

# Callbacks
def init(:ok), do: {:ok, []}

def handle_cast({uri, user, password}, state) do
  task = start_task({uri, user, password})
  {:noreply, [task|state]}
end

def handle_info({ref, reply}, state) do
  Process.demonitor(ref, [:flush])
  # Handle task status
  new_state = remove_task_from_state(ref, state)
  {:noreply, new_state}
end

def handle_info({:DOWN, ref, _proc, _pid, reason}, state) do
  Process.demonitor(ref, [:flush])
  # Handle task failure
  new_state = remove_task_from_state(ref, state)
  {:noreply, new_state}
end

defp start_task({uri, user, password}, mHttpHeaders \\ %{}) do
  lAuth = [basic_auth: {user, password}]
  mHeaders = Map.merge(mHttpHeaders, %{"Accept" => "application/json"})
  Task.Supervisor.async_nolink(sup_pid, HTTPoison, :get, [uri,  mHeaders, [hackney: lAuth]])
end

There’s a great example in Programming Phoenix of this technique, but it was written before Task.Supervisor.async_nolink/4 so it manually monitors the processes. (I haven’t tested this code btw, it’s just a schematic.)

Hello all. I was hoping to see if you all could code review this snippet. I need to parse 150g of csv. I was wondering for the solution at hand if I had the appropriate approach. I am needing to spread out my work over my cpus so I can speed up the processing power. A little of what exactly is needed. This is not the final solution just the part that is important. I do need to eventually write out the results to unique files and then read in all the generated new files to create a final master file with the results.

def deep_parse(file_path, chunk_size) do
  file_path
      |> File.stream!(read_ahead: chunk_size)
      |> Stream.drop(1)
      |> Flow.from_enumerable
      |> Flow.map(&String.split(&1, ",") |> List.first)
      |> Flow.partition
      |> Flow.uniq
      |> Enum.to_list
end

Is this properly done? Am I making some fundamental theory or practice mistake?

Do you need to maintain order of the entries in the CSV? If so, you can’t use Flow, because it shuffles your entries around.
Unless you are running on very old hardware, I do think that the real bottleneck is the read-rate of your source rather than the processing of the stream. Have banchmarked before hand that your bottleneck is really the processing?

The order does not matter. I am connecting to an Apple Airport which I can not seem to find out if it is disk or SSD or not. How would I benchmark that?

Could I possibly split up the files into groups of 10 mil and then use a for loop of streams to speed up the process? or would that still run into the same bottleneck of read and write. When would flow be useful then if no machine can get past read/write bottlenecks?

I did test with 1 million elements. A regular Stream took 8 seconds while the Flow version took 1.5 seconds.

Well, from my point of view an airport is a place were planes arrive and depart, so I can’t help you with that. I do not care for Apple special terminology which so often just boils down to such they-think-it-is-better-than-plain-bluetooth-or-wifi-but-is-rather-unusable-because-no-one-except-apple-understands-it-but-they-cant-fall-back-to-plain-technology-when-they-talk-to-none-apple-devices…

Aside of that, when the walltime is in fact shorter when processing in parallel, then you seem to have already benchmarked :wink:

Apple Airport is pretty much a wifi accessible external hard drive. That was ran on my local machine on a small file though. I am going to have to hook up to the airport with a usb cable.