Repeat previous pipe while condition?

Hello

For each item in a list, I am requesting an API and based on the response, I decide to call the same API again, or proceed .

I have something like that:

items
|> Enum.map(&Task.async(fn -> item =&1)
HTTPoison.get(url)
end))
|> Enum.map(&Task.await/1)
|> Enum.map(fn(response) ->
  case response do
    {:ok, response} -> #continue..
    _ -> # How to step back to pipe where I call the rest call from here?
  end
end)
|> .. etc

How can I do a repeat while inside a pipe above? or what’s the recommended approach here?

I think what is happening here is that you have broken down your tasks too far.
Your first task ends with:

  HTTPoison.get(url)

but your code suggests that the “task” isn’t really done yet.

Instead of dealing with the response later in the pipe, deal with it right in the first task, e.g.:

  case HTTPoison.get(url) do
    {:ok, response} ->
      #continue..
    _ ->
      # How to step back to pipe where I call the rest call from here?
  end

i.e. inside the task function simply let the blocking function calls block and then return in either case a (unified) result that the pipe can deal with. Task.await/2 can take an optional timeout parameter in case 5000 ms isn’t long enough.

4 Likes

In addition to what @peerreynders was suggesting you might want to take a look at Task.async_stream/3 which offers some useful options to deal with concurrent tasks that are started from an enumerable.

2 Likes

Thanks, I think using a genserver to handle the blocking call and return the final response would give more flexibility to handle possible errors inside the genserver

When does the stream resulted from Task.async_stream/3 actually run? is it when iterated into enumerable? like the example at documentation:

Enum.reduce(stream, 0, fn {:ok, num}, acc -> num + acc end)

?

Yes. Streams are lazily evaluated, namely only when you actually try to use them like with Enum.each or Enum.map.

1 Like

Stream.transform/4 has a way to halt when your conditions are met.
https://hexdocs.pm/elixir/Stream.html#transform/4

2 Likes

Not necessarily, using Task.async_stream gives you the same, you just have to handle error inputs in the function that accepts the result of each task:

items
|> Task.async_stream(&HTTPoison.get/1)
|> Enum.map(fn
  {:ok, response}  -> # work with the downloaded file
  {:error, reason} ->  # handle the error
end)

Using GenServer is additional and unnecessary complexity. Using Task.async_stream also parallelizes work amongst all CPU cores transparently, while using Task.async and then looping on the task IDs with Task.await introduces a sequential waiting of the tasks, one by one, which is inefficient and you are not guaranteed to use all CPU cores.

1 Like

As far as I understand it Task.async_stream/5 simply offers concurrency control and the :max_concurrency setting simply defaults to System.schedulers_online/0. That is, there are no guarantees beyond Task.async that the tasks will be better distributed - in either case the BEAM will do the best it can under the circumstances.

Furthermore to me there is nothing in the opening code sample that indicates any desire for concurrency control - whether there are 10 or 1000 items, all of them are to initiate their HTTP request at once in order to achieve the maximal amount of parallelism. And running a Task.async on each item will do just that.

while using Task.async and then looping on the task IDs with Task.await introduces a sequential waiting of the tasks.

You are actually only waiting for the one task that blocks the longest - wherever that may occur in the list. By the time that one un-blocks all the remaining task results are already waiting to be pulled out of the process mailbox - i.e. none of the subsequent Task.await/2 will actually block.

1 Like

I am using Enum.map with GenServer in parallel as follows:

vectors= Enum.take(destinations, rows)
      |> Enum.with_index
      |> Enum.map(fn({element, index})->
        body= Map.put(vector, :origins, [Enum.at(vector.destinations, index)]) |> Poison.encode!
        DistanceMatrix.start(body, index)
      end)
      |> Enum.map(&DistanceMatrix.load/1)
      |> Enum.map(&DistanceMatrix.await/1)
      |> Enum.map(fn({{pid, vector}, index})-> 
          #... formating
          end)

That would be amazing with even less code to write, but how would I handle errors or process crashes? How would I handle graceful exist with Task.async_stream like :trap_exit in GenServer? I even was thinking to use supervisor for each GenServer to handle possible crashes.

So, how would I substitute GenServer features using Task.async_stream or transform/4

Look into Task.Supervisor. For a contrived code example look here.

1 Like

Not in practice. I have been making report generators and gather/scatter workflows where multiple DB / file streams get combined into a single data stream and Task.async_stream improved the code speed by 30 - 80%. Theoretically, with 4 cores the improvement should be 400% but oh well, it’s still better.

So I can partially agree with you that the benefits seem to be overblown, but there’s still a beneficial difference in practice.

It’s good that this does not introduce a sequential block and I stand corrected, thank you. But I argue that’s also a reason to use Task.async_stream or Task.yield_many because it’s shorter and communicates the intent of the code better, would you not agree? (Also error processing is kept intact – except for process crashes.) We can probably concede it’s a matter of coding style but I stand on the side that insists that we should strive for minimal code lines with maximum clarity of intent.

Agreed in full. This also depends a lot on the machine running the code. There are a number of kernel parameters (like FD limits) that can make a code that downloads 1000 URLs fast or very slow.

1 Like

I saw the example, its using GenServer worker, I thought you mean Task with a function
also, in the example your sent, I didn’t see Task.Supervisor… ?

Since @peerreynders already gave you an excellent suggestion, I am only gonna chime in here:

I suggest you replace Enum with Stream on all but the last call. That way only one loop will be executed instead of the six in your code snippet:

vectors =
  Stream.take(destinations, rows)
  |> Stream.with_index
  |> Stream.map(fn({vector, index}) ->
    body =
      Map.put(vector, :origins, [Enum.at(vector.destinations, index)])
      |> Poison.encode!
    DistanceMatrix.start(body, index)
  end)
  |> Stream.map(&DistanceMatrix.load/1)
  |> Stream.map(&DistanceMatrix.await/1)
  |> Enum.map(fn({{pid, vector}, index})-> 
    #... formating
  end)

…however, now that I read it, it very much depends on what do these functions do: DistanceMatrix.start, DistanceMatrix.load and DistanceMatrix.await… are they starting processes, are they doing singular synchronous task?

So Stream might not be applicable. Depends on what your inner code does.

Task.Supervisor is started as part of the application in MyApp.Application.start/2. It is then later used by Task.Supervisor.async_stream_nolink/6 within Worker.stream_tasks/2

Worker is simply a GenServer that needs results from multiple tasks - while being “safe” from them should they misbehave. async links to the spawning process so a crashing task will take down everything else.

Worker.stream_fun is the function being run as a task. The input list [1000, -500, 2000] results in three separate tasks:

  1. Sleeps 1000 ms then returns an integer value of 1000
  2. Sleeps 500 ms and then crashes
  3. Sleeps 2000 ms and is terminated by the 1500 ms timeout

The Worker GenServer receives the completed results from all the tasks in

  def handle_info({task_ref, result}, {app, name, task_ref}) do
    # normal task result arrives here - demonitor/flush :normal :DOWN
    Process.demonitor(task_ref, [:flush])
    IO.puts("Result: #{inspect result}")
    {:stop, :normal, {app, name, nil}}
  end

which happens to be:

[ok: 1000, exit: :crash, exit: :timeout]

i.e. only the first task completed successfully while the others exited: due to a crash or was terminated via a timeout.

Joe Armstrong’s stated goal for the BEAM is:

applications run 0.75 x N times faster on an N-core computer

How we program multicores - Joe Armstrong

Also it’s not too hard to imagine that the async_stream way of doing things is likely highly optimized while there are lots of ways to foul up a bunch of hand-coded asyncs.

There are a number of kernel parameters (like FD limits) that can make a code that downloads 1000 URLs fast or very slow.

My intent wasn’t to advocate boundless process spawning. I can merely see the attraction of simply firing up “a few” tasks via async and then using Enum.map/2 in combination with Task.await/2 as a kind of await_all - especially if you aren’t particularly concerned about what happens when one of the tasks crashes.

Task.Supervisor.async_stream_nolink/6 seems like a much more adept solution just for the built-in fault isolation and capture - it’s easy enough to set :max_concurrency to a high enough value to adjust for a reasonable amount of concurrency given the job at hand (e.g. waiting for a bunch of blocked requests).

1 Like

Pretty interesting, had no idea. Thanks a lot!

That’s what I theorize without having any proof, yep.