Emitting values from intermediate steps in a pipeline

I am implementing a file processing pipeline in Elixir. Generically speaking there are a few steps file → A → B → C, and also A → D.

I’d like to emit data from each step, as it is generated, because each step can vary in throughput, and I want to consume the output immediately… For example, step A can emit a struct with file metadata and file chunks. For example, D can emit some vectors created by sending file chunks through Bumblebee.

What’s the Elixir way for each step to have a kind of plugin architecture, where I “the pipeline orchestrator” can install a different Consumer for each step? For example, I could install a “StdioWriter” to step A, so I could see what was being emitted.

It’s kind of like a “tap” into the pipe.

Pass a module or function as an argument and execute it in the step, publish pubsub messages or telemetry events?

1 Like

It sounds like you might be looking for Broadway

2 Likes

Passing a function (or list of functions) sounds like the simplest option. I guess the functions should all have the same interface.

How would you implement pubsub / telemetry?

cmo, did you mean this? telemetry — telemetry v1.3.0

It looks like I can use :telemetry to emit events to attached event handlers.

Thanks, I do need something with backpressure, so I thought GenStage or Broadway might help.

Hey all, I was able to accomplish what I wanted to do with the help of two github examples and a forum thread, but now I have a question about trapping exits that I could use some insight on. First here are the references, and after is my question.

The process_files function calls Supervisor.start_link() to call some GenStage processes. The first step, A, is marked :transient and terminates with reason :normal when there are no more files. All of the other steps are also :transient so they terminate when A terminates. A is also marked :significant, so the Supervisor will auto shutdown (because of auto_shutdown: all_significant).

process_files is called from an ExUnit test. If I comment out Process.flag(:trap_exit, true), why does my ExUnit test fail with exit reason shutdown? If I uncomment it, my ExUnit test passes. I’ve renamed some things for simplicity’s sake.

Am I using Supervisor correctly? Is there an entirely different way to do this?

process_files

  def process_files(files) do
    IO.puts("Processing #{length(files)} files...")

    children =
      [
        Supervisor.child_spec({A, files}, significant: true)
      ] ++
        for(
          i <- 1..@pf_workers,
          do:
            Supervisor.child_spec({B, i},
              id: B.process_name(i)
            )
        ) ++
        for(
          i <- 1..@km_workers,
          do:
            Supervisor.child_spec({C, {i, @pf_workers}},
              id: C.process_name(i)
            )
        ) ++
        for(
          i <- 1..@sub_workers,
          do:
            Supervisor.child_spec({D, {i, @km_workers}},
              id: D.process_name(i)
            )
        )

    # "Supervisors are not meant to die, they are intended to run indefinitely, monitoring children."
    # https://elixirforum.com/t/supervisor-dies-with-its-child/466/9?u=bgoosman
    # But we can enable auto_shutdown and declare the Producer significant to shut down the whole tree when it's done.
    # "Significant :transient child processes must exit normally for automatic shutdown to be considered."
    {:ok, pid} =
      Supervisor.start_link(children,
        strategy: :rest_for_one,
        name: ImportFileWorkflow.Supervisor,
        auto_shutdown: :all_significant
      )

    # If I comment this line out, my test fails
    Process.flag(:trap_exit, true)
    Process.monitor(pid)

    IO.puts("I am #{inspect(self())}")

    receive do
      {:DOWN, _, :process, ^pid, _} ->
        IO.puts("All files processed")
        :ok
    end
  end

ExUnit test

  @tag timeout: :infinity
  test "import 1000 files" do
    path = "/data"
    files = find_pdfs(path)
    ImportFileWorkflow.process_files(files)
  end
1 Like

I found Oban and rewrote my solution quite quickly. Oban Web is great too.