How do I gracefully trap an exit with Task.async_stream?

otp
task
processes
how-to-question

#1

I’m trying to convert a directory of Markdown files into HTML, and I’m trying to do it concurrently with Task.async_stream. Here is the code that I have:

defmodule SomeModule do
  defp write_articles_html do
    Process.flag(:trap_exit, true)

    html_conversion_results =
      Article.files
        |> Task.async_stream(fn markdown_file ->
             write_article_html(markdown_file)
           end,
           max_concurrency: System.schedulers_online * 2)
        |> Enum.to_list

    IO.inspect html_conversion_results
  end

The function called in Task.async_stream i.e. write_article_html looks like this:

  defp write_article_html(markdown_file_path) do
    html_dir = article_html_dir(markdown_file_path)
    full_html_dir = Path.join(@build_directory, html_dir)

    File.mkdir!(full_html_dir)
    # ...
  end
end # defmodule SomeModule

When the File.mkdir! fails, I would like the parent process (that made the Task.async_stream) not to crash.

I understand I can do this by trapping exits, and I’ve added the code to do it (Process.flag(:trap_exit, true)

The main process still crashes though (crash output shown below). What am I missing? (Do I need to add a receive somewhere for the :exit message? Both these function are in the same module, for reference)

How could I make sure the caller process can handle the mkdir failing? (If this is explained in any documentation, please let me know).


ERROR

Here is the output when the process crashes:

[ok: :ok, ok: :ok, ok: :ok, ok: :ok, ok: :ok, ok: :ok,
 exit: {%File.Error{action: "make directory", path: "_build/my-new-article",
   reason: :eexist},
  [{File, :mkdir!, 1, [file: 'lib/file.ex', line: 183]},
   {Task.Supervised, :do_apply, 2, [file: 'lib/task/supervised.ex', line: 85]},
   {Task.Supervised, :reply, 5, [file: 'lib/task/supervised.ex', line: 36]},
   {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]},
 exit: {%File.Error{action: "make directory", path: "_build/my-new-article",
   reason: :eexist},
  [{File, :mkdir!, 1, [file: 'lib/file.ex', line: 183]},
   {Task.Supervised, :do_apply, 2, [file: 'lib/task/supervised.ex', line: 85]},
   {Task.Supervised, :reply, 5, [file: 'lib/task/supervised.ex', line: 36]},
   {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}]

20:58:50.968 [error] Task #PID<0.127.0> started from #PID<0.120.0> terminating
** (File.Error) could not make directory "_build/my-new-article": file already exists
    (elixir) lib/file.ex:183: File.mkdir!/1
    (elixir) lib/task/supervised.ex:85: Task.Supervised.do_apply/2
    (elixir) lib/task/supervised.ex:36: Task.Supervised.reply/5
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: &:erlang.apply/2
    Args: [#Function<0.35336826/1 in Lydian.Builder.Impl.write_articles_html/1>, ["source/2017-08-23-my-new-article.md"]]

20:58:50.976 [error] Task #PID<0.128.0> started from #PID<0.120.0> terminating
** (File.Error) could not make directory "_build/my-new-article": file already exists
    (elixir) lib/file.ex:183: File.mkdir!/1
    (elixir) lib/task/supervised.ex:85: Task.Supervised.do_apply/2
    (elixir) lib/task/supervised.ex:36: Task.Supervised.reply/5
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: &:erlang.apply/2
    Args: [#Function<0.35336826/1 in Lydian.Builder.Impl.write_articles_html/1>, ["source/2017-11-12-my-new-article.md"]]

#2

https://hexdocs.pm/elixir/Task.html#async_stream/5

Finally, consider using Task.Supervisor.async_stream/6 to start tasks under a supervisor. If you find yourself trapping exits to handle exits inside the async stream, consider using Task.Supervisor.async_stream_nolink/6 to start tasks that are not linked to the current process.

As for your original problem, it would help if you could paste the message you see when the parent process crashes.

Of course, the simplest would still be to call mkdir instead of mkdir!


#3

Using mkdir instead of mkdir! does not seem to help in this case. I’d like the status reported by Task.async_stream to not be {:ok, ...} for this call, as it is an error case (I want to use the non-ok cases to print the failed reasons together at the end).

(I’ve pasted the error reported in my original question now)


#4

The main process did not crash in your example. The first line in the log is the output of IO.inspect html_conversion_results which wouldn’t have happened if the process had crashed.

What’s wrong with something like {:ok, {:error, :directory_exists}}? The first :ok just means that the task did not crash. You can still use the return value to report success or error.


#5

I would personally go with @dom’s solution, but for the sake of argument: you can catch the exception and log it without re-throwing:

Task.async_stream(fn markdown_file ->
  try do
    write_article_html(markdown_file)
  rescue
    e -> Logger.error(Exception.message(e))
  end
end,

#6

Thanks for all the inputs.

@dom You’re absolutely right. The parent process does not crash. I was misled by the tasks themselves crashing and displaying a message on the screen to that effect. Sorry about that. (I’m still new to processes in Elixir/OTP)

I’ll work on the proposed solutions and will update this thread accordingly.