What is a good way to supervise multi core data manipulation?

Hey, I have a question related to Supervisor, Stream and Flow.
Let’s make a simplest example:

defmodule Example do
  def process(list_element), do: list_element * list_element
end
list = [1, 2, 3, 4, 5]

In normal case we could call Enum.map(list, &Example.process/1) or &Stream.map(&Example.process/1).
But it’s only a simple example. In more complicated processing I need to supervise each of &Example.process/1 call and run multiple (with sorting order) calls (multithread).
So what I’m looking for is API like this:

# restart each element that fails
stream = supervise(list, &Example.process/1)
# get first 3 entries, supervise and process each of them
first_result = Enum.take(stream, 3)
second_result =
  stream
  |> Enum.map(&SomeModule.save_element/1)
  |> Enum.uniq
IO.puts Enum.count(second_result == 1) and List.first(second_result) == :ok

You likely want https://hexdocs.pm/elixir/Task.Supervisor.html#async_stream/4

1 Like

@benwilson512: this looks awesome!

I have some questions:

1 Do I need to start supervisor or it’s automatically started when I’m calling async_stream method?

defmodule Example.Supervisor do
  use GenServer

  # supervisor implementation here ...
end

Example.Supervisor.start_link(...) # Do I need it?
stream = Task.Supervisor.async_stream(Example.Supervisor, collection, Mod, :expensive_fun, [])
Enum.to_list(stream)

2 Can I run multiple processing methods on same supervisor module?

defmodule Example.Supervisor do
  use GenServer

  # supervisor implementation here ...
end

stream = Task.Supervisor.async_stream(MySupervisor, collection, Mod, :expensive_fun, [])
another_stream = Task.Supervisor.async_stream(MySupervisor, another_collection, AnotherMod, :another_expensive_fun, [])