Learning how to use concurrency - Task.async_stream

defmodule Foo do

def remove_single_item_lists(enumerable) do
  Enum.flat_map(enumerable, fn
    {_, [_, _ | _] = map} -> map

    _ -> []
  end)
end

end
cars =
%{
  "Ferrari" => [
    %{color: "Blue", make: "Ferrari", mileage: 120012.481},
    %{color: "Red", make: "Ferrari", mileage: 29831.021},
    %{color: "Black", make: "Ferrari", mileage: 24030.674},
    %{color: "Cobalt", make: "Ferrari", mileage: 412.811}
  ],
  "Koenigsegg" => [
    %{color: "Blue", make: "Koenigsegg", mileage: 250.762},
    %{color: "Cobalt", make: "Koenigsegg", mileage: 1297.76}, 
    %{color: "Titanium", make: "Koenigsegg", mileage: 5360.336}
  ],
  "Maserati" => [%{color: "Blue", make: "Maserati", mileage: 255.78}],
  "Mclaren" => [%{color: "Red", make: "Mclaren", mileage: 15641.469}]
}
iex(5)> Foo.remove_single_item_lists(cars)

[
  %{color: "Blue", make: "Ferrari", mileage: 120012.481},
  %{color: "Red", make: "Ferrari", mileage: 29831.021},
  %{color: "Black", make: "Ferrari", mileage: 24030.674},
  %{color: "Cobalt", make: "Ferrari", mileage: 412.811},
  %{color: "Blue", make: "Koenigsegg", mileage: 250.762},
  %{color: "Cobalt", make: "Koenigsegg", mileage: 1297.76},
  %{color: "Titanium", make: "Koenigsegg", mileage: 5360.336}
]
iex(6)> cars |> Task.async_stream(Foo, :remove_single_item_lists, []) |> Enum.map(fn {:ok, val} -> val end) 

16:08:35.086 [error] Task #PID<0.357.0> started from #PID<0.394.0> terminating
** (Protocol.UndefinedError) protocol Enumerable not implemented for {"Ferrari", [%{color: "Blue", make: "Ferrari", mileage: 120012.481}, %{color: "Red", make: "Ferrari", mileage: 29831.021}, %{color: "Black", make: "Ferrari", mileage: 24030.674}, %{color: "Cobalt", make: "Ferrari", mileage: 412.811}]} of type Tuple 
    (elixir 1.10.3) lib/enum.ex:1: Enumerable.impl_for!/1
    (elixir 1.10.3) lib/enum.ex:141: Enumerable.reduce/3
    (elixir 1.10.3) lib/enum.ex:3383: Enum.flat_map/2
    (elixir 1.10.3) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
    (elixir 1.10.3) lib/task/supervised.ex:35: Task.Supervised.reply/5
    (stdlib 3.12.1) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: &Foo.remove_single_item_lists/1
    Args: [{"Ferrari", [%{color: "Blue", make: "Ferrari", mileage: 120012.481}, %{color: "Red", make: "Ferrari", mileage: 29831.021}, %{color: "Black", make: "Ferrari", mileage: 24030.674}, %{color: "Cobalt", make: "Ferrari", mileage: 412.811}]}]
** (EXIT from #PID<0.357.0>) shell process exited with reason: an exception was raised:
    ** (Protocol.UndefinedError) protocol Enumerable not implemented for {"Ferrari", [%{color: "Blue", make: "Ferrari", mileage: 120012.481}, %{color: "Red", make: "Ferrari", mileage: 29831.021}, %{color: "Black", make: "Ferrari", mileage: 24030.674}, %{color: "Cobalt", make: "Ferrari", mileage: 412.811}]} of type Tuple
        (elixir 1.10.3) lib/enum.ex:1: Enumerable.impl_for!/1
        (elixir 1.10.3) lib/enum.ex:141: Enumerable.reduce/3
        (elixir 1.10.3) lib/enum.ex:3383: Enum.flat_map/2
        (elixir 1.10.3) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
        (elixir 1.10.3) lib/task/supervised.ex:35: Task.Supervised.reply/5
        (stdlib 3.12.1) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
iex(7)> cars |> Task.async_stream(Foo, :remove_single_item_lists, []) |> Tuple.to_list() |> Enum.map(fn {:ok, val} -> val end) 

** (ArgumentError) argument error
    :erlang.tuple_to_list(#Function<1.67315895/2 in Task.build_stream/3>)

What am I missing? Thanks for all your help! :slight_smile:

You are missing the error message, which tells You where…

… and why

It’s because iterating over a map return key, value tuples, which are not Enumerable.

How is it that passing the stream’s results to Tuples.to_list doesn’t resolve the issue?

It’s because the error happens before… like here, as the error message says.

Since Foo.remove_single_item_lists(cars) works correctly how do I get Foo.remove_single_item_lists to work with Task.async then?

Task.async takes an Enum, in your case a map… and returns list of tuples. You feed your function with tuples instead of enumerable.

…which is clear if You read the full error message.

What You are doing in reality is…

Enum.map(cars, & Foo.remove_single_item_lists(&1))

…which would fail as well, with the same error message.

Task.async won’t help you, you are giving invalid arguments to your function.

The way Task.async_stream calls the function you give it is equivalent to Enum.map. So:

Task.async_stream(cars, Foo, :remove_single_item_lists, [])
# calls `Foo.remove_single_item_lists` just like:
Enum.map(cars, &Foo.remove_single_item_lists/1)

You could refactor your functions like this:

defmodule Foo do
  def remove_single_item_lists(enumerable) do
    Enum.flat_map(enumerable, &remove_single_item_list/1)
  end

  def remove_single_item_list({_key, [_, _ | _] = map}), do: map
  def remove_single_item_list(_), do: []
end

cars = ...

# note the function being passed here
cars |> Task.async_stream(Foo, :remove_single_item_list, []) |> Enum.flat_map(fn {:ok, val} -> val end)
1 Like

Finally! A response that’s actually helpful! :slight_smile: :slight_smile: :slight_smile:

Thank you so much for patiently helping me understand what’s actually going on under the hood.

You’re a wonderful teacher!!!

Thanks again! :slight_smile:

It’s great to hear that you found Matt’s post helpful @Maxximiliann, however the wording of your reply isn’t really fair on everyone else who took the time to try and help you.

If you find a reply doesn’t quite answer your query, it’s usually best to thank the poster for their attempt to help you and to simply say that you still don’t understand, and whether they would kindly elaborate for you. You are more likely to get help this way, not just in that thread but future threads too :+1:

7 Likes

That is a wonderful suggestion! I’ll certainly keep that in mind for the future.

Thank you! :slight_smile:

3 Likes