I have added a task.yield in the callback function of Enum.map, Does Enum.map run serially or in parallel? If it runs serially then this would be a bad idea.
Enum.map
runs serially, as do all the Enum functions.
What would be bad idea?
For parallel processing you can use this
list
|> Task.async_stream(fn x -> x + 1 end)
|> Enum.to_list()
Hey @a-c-sreedhar-reddy welcome to the forum. Any time you have a question involving code you’ve written, please always include that code. This helps us answer your question.
Enum.map
runs for one element at a time - if you have a list of Task
s you’d like to wait for, consider Task.yield_many
which will wait for all of them at once.
Enum
functions are for running things serially. It might seem like you’d want to run everything concurrencly in a language that allows for such cheap concurrency, but there’s still a lot of times when serially running is the best choice. Either because logically the steps need to be sequential, or because even though concurrency in Erlang/Elixir is relatively cheap, it’s not free. So, in a contrived example, mapping over the range 1..100
and adding 1 to each element, is about 38x slower when done in parallel. Because we’re doing such a small amount of work in each step, the overhead of concurrency heavily outweighs any benefits of it. In contrast something like mapping over 1..10
and making a request to Google, is about 8x slower sequentially, than in parallel.
So maybe sequential is what you need, in which case Enum.map/2
will be the right tool. If you do want concurrency, something like this is a good starting point. map/2
will start a process for each item in the list, whereas map2/2-4
can be limited, so maybe you want a max of 5 processes started to process 100 things. There’s obviously even more you can do, like supervising the tasks, or more complex processing pipelines. Task
is a good place to start. Broadway is a good place to start for the more complex processing.
defmodule PMap do
def map(enum, fun) do
Enum.map(enum, &Task.async(fn -> fun.(&1) end))
|> Task.await_many()
end
def map2(enum, fun, concurrency \\ 10, timeout \\ :infinity) do
Task.async_stream(enum, &fun.(&1), max_concurrency: concurrency, timeout: timeout)
|> Stream.map(fn {:ok, val} -> val end)
|> Enum.to_list()
end
end