Emitting values from intermediate steps in a pipeline

Hey all, I was able to accomplish what I wanted to do with the help of two github examples and a forum thread, but now I have a question about trapping exits that I could use some insight on. First here are the references, and after is my question.

The process_files function calls Supervisor.start_link() to call some GenStage processes. The first step, A, is marked :transient and terminates with reason :normal when there are no more files. All of the other steps are also :transient so they terminate when A terminates. A is also marked :significant, so the Supervisor will auto shutdown (because of auto_shutdown: all_significant).

process_files is called from an ExUnit test. If I comment out Process.flag(:trap_exit, true), why does my ExUnit test fail with exit reason shutdown? If I uncomment it, my ExUnit test passes. I’ve renamed some things for simplicity’s sake.

Am I using Supervisor correctly? Is there an entirely different way to do this?

process_files

  def process_files(files) do
    IO.puts("Processing #{length(files)} files...")

    children =
      [
        Supervisor.child_spec({A, files}, significant: true)
      ] ++
        for(
          i <- 1..@pf_workers,
          do:
            Supervisor.child_spec({B, i},
              id: B.process_name(i)
            )
        ) ++
        for(
          i <- 1..@km_workers,
          do:
            Supervisor.child_spec({C, {i, @pf_workers}},
              id: C.process_name(i)
            )
        ) ++
        for(
          i <- 1..@sub_workers,
          do:
            Supervisor.child_spec({D, {i, @km_workers}},
              id: D.process_name(i)
            )
        )

    # "Supervisors are not meant to die, they are intended to run indefinitely, monitoring children."
    # https://elixirforum.com/t/supervisor-dies-with-its-child/466/9?u=bgoosman
    # But we can enable auto_shutdown and declare the Producer significant to shut down the whole tree when it's done.
    # "Significant :transient child processes must exit normally for automatic shutdown to be considered."
    {:ok, pid} =
      Supervisor.start_link(children,
        strategy: :rest_for_one,
        name: ImportFileWorkflow.Supervisor,
        auto_shutdown: :all_significant
      )

    # If I comment this line out, my test fails
    Process.flag(:trap_exit, true)
    Process.monitor(pid)

    IO.puts("I am #{inspect(self())}")

    receive do
      {:DOWN, _, :process, ^pid, _} ->
        IO.puts("All files processed")
        :ok
    end
  end

ExUnit test

  @tag timeout: :infinity
  test "import 1000 files" do
    path = "/data"
    files = find_pdfs(path)
    ImportFileWorkflow.process_files(files)
  end
1 Like