How to leverage concurrency in Enum.reduce()?

This may be a basic question, but it’s not always clear to me when or how one should leverage Elixir’s concurrency support.

Consider a simple example where we want to replace every vowel in a sentence with its upper-case equivalent. The replacements list contains tuples indicating the character to find and its replacement:

original = "this is a sentence originally all lower-case"
      replacements = [
        {"a", "A"},
        {"e", "E"},
        {"i", "I"},
        {"o", "O"},
        {"u", "U"},
      ]
      
      output = Enum.reduce(replacements, original, fn {find, replacement}, acc ->
          String.replace(acc, find, replacement) # <-- works
      end)

This works, but it does not leverage concurrency. I tried to use spawn(fn -> String.replace(acc, find, replacement) end) inside the reduce function, but of course that doesn’t work because spawn() returns a PID.

This problem is probably a poor candidate for concurrency because each replacement relies on the output of the previous operation. Is there a way to structure this problem in a way that might take advantage of concurrent processes?

Thanks for any pointers!

2 Likes

As You mentionned reduce is not a good candidate because elements are dependant. Recursive functions as well… But if elements are independants, that could be valuable.

An example would be map and pmap :slight_smile:

1 Like

A recursive function with binary comprehensions will be about twice as fast as your current approach (but not as generalised):

  def replace(<< "" >>), do: ""
  def replace(<< "a", rest :: binary >>), do: << "A", replace(rest) :: binary >>
  def replace(<< "e", rest :: binary >>), do: << "E", replace(rest) :: binary >>
  def replace(<< "i", rest :: binary >>), do: << "I", replace(rest) :: binary >>
  def replace(<< "o", rest :: binary >>), do: << "O", replace(rest) :: binary >>
  def replace(<< "u", rest :: binary >>), do: << "U", replace(rest) :: binary >>
  def replace(<< c :: utf8, rest :: binary >>), do: << c, replace(rest) :: binary >>

To approach concurrency I expect you would need to:

  1. Chunk the string into substrings
  2. Use Task.async_stream/4 to run your replacement algorithm concurrently
  3. And then join the fragments back up.:
max_concurrency = System.schedulers_online() * 2
stream = Task.async_stream(chunks, MyModule, :replace, [], max_concurrency: max_concurrency, ordered: true)

stream
|> Enum.to_list
|> Enum.join

Unless the string is of reasonable length, I suspect the cost of chunking the string (accounting for unicode would be more expensive than assuming ascii) and then creating the stream and rejoining the chunks would create more overhead than you would save. Thats more string allocation, and also counting and splitting strings can be relatively expensive. I expect for your sample sentence it wouldn’t be worth it.

I would take some experimentation to find the optimal choice between serial and concurrent, how many concurrent streams, how long the string, whether you support unicode (you should) and so on.

4 Likes