Library Idea for Data Pipeline Processing

Hello!
I have an idea for an Elixir library I’d like to work on, but wanted to get some thoughts from the community first. It would allow you to define data pipelines declaratively. Here’s an example of what I’m imagining the API would look like:

defmodule MyPipeline do
  # Library name tbd
  use Pipeline

  pipeline do

    # This strategy would generate a GenStage pipeline that runs steps in the same stage concurrently
    strategy Pipeline.GenStage

    # This strategy would simply run every step on after the other in the same process.
    # strategy Pipeline.SingleProcess

    stage do
      step Socks, color: :blue
      step Underwear, style: :boxers
      step Shirt
    end

    stage do
      step Pants
      step Tie
    end

    stage do
      step Shoes
      step Belt
    end

    step Jacket
  end

  # The pipeline above would generate a data flow that looks something like the following.
  # All the steps in the same stage will be executed in parallel.
  # A stage will start as soon as all the steps in the previous stage have been ran.
  # Stage 1
  # ┌─────┐┌─────────┐┌─────┐
  # │socks││underwear││shirt│
  # └─────┘└─────────┘└─────┘
  # Stage 2
  # ┌──────────┐┌───────────┐
  # │pants     ││tie        │
  # └──────────┘└───────────┘
  # Stage 3
  # ┌───────────┐┌──────────┐
  # │shoes      ││belt      │
  # └───────────┘└──────────┘
  # Stage 4
  # ┌───────────────────────┐
  # │jacket                 │
  # └───────────────────────┘
end

Each step would look like the following, and could probably also be just a simple function instead of a module:

defmodule Step.Pants do
  @behaviour Pipeline.Step

  @impl Pipeline.Step
  def run(env, _opts) do
    IO.puts("Putting on my pants!")

    updated = Pipeline.Step.put_info(env, pants: :done)

    {:ok, updated}
  end
end

Anyways, I just wanted to get some thoughts before I start working on this. I was unable to find anything that already exists that does this. Broadway and Flow are probably the most similar, and I can even see this using Flow under the hood for the strategy Pipeline.GenStage stuff.

I don’t want to work on it if it has any major flaws that anyone can foresee, or if there’s just a better way to do this already.

Thanks in advance!

4 Likes

Hi!

My eyes widened a bit as I read the post title, as I briefly went down a related rabbit hole myself the past weekend. My use case would be very similar, maybe a bit more specific, though. I intend to try to build something that in its first incarnation would be able to be made compatible with Airbyte source connectors. I’m very hesitant to call it an Elixir-based Airbyte CDK, as that would be quite a tall order for the time and experience I have :sweat_smile:, but shooting for the moon - that’s what it ideally would come close to, covering a good chunk of the source connector functionality. Feature wishlist for a v0.1 would be roughly:

  • DSL for defining a data source (ReST API ones at first) with multiple streams
    • streams can be aware of / depend on other streams’ outputs (one stream’s result might trigger n downstream ones)
    • declarative config for caching, paging, auth types, basic data validation
  • A way to turn the source declaration into a DAG
  • A way to execute the source DAG as a pipeline using Flow / GenStage, persisting the fetched data
  • Helper library + code generator to build a Docker image with commands to satisfy the Airbyte source connector spec
  • Telemetry events / collector to report pipeline progress/errors in real-time

“Stretch goals” would be some integrations with LiveBook - smart cells for running the pipelines, live progress updates and charts with statistics, that sort of goodies.

It’ll be a few weeks at least until I have some time to continue work on this and it’s questionable if it’ll be viable, but just to share two things from searching for libraries that would reduce the amount of work to get to a working prototype:

  1. Ash framework’s Spark might help a lot with the DSL definitions. I looked briefly into the macro code for Ecto and Absinthe schema notations and must admit I was quite overwhelmed. Spark is severely under-documented (will try to contribute some docs once I’ve gotten more familiar with it), but makes the task way easier and takes care of argument validation and docs generation. Was very happy to find it :+1:
  2. Piper looks to be just what I expect to need for turning a source/streams definition into a DAG of tasks to be performed.

Other than that, I don’t really have a lot to share at the moment in terms of feedback, but let me know if the above aligns at least somewhat with what you have in mind and you’d like to exchange ideas.

Also looking forward to reading other responses and ideas here. From what I could tell, all the separate parts for this type of project exist in the ecosystem, but I did not find anything that matches my (or your) intended use case entirely.

You might also look at Dagger which is being more actively worked on than Piper which is more of a reference example with the expectation others would vendor a forked version into their own repo.

You can also check out the notebook I made for my Empex MTN talk.

Oban Pro Workflows are probably the most production ready version of this idea in Elixir land - but you do have to pay :moneybag:

Dagger is doing some extra things like conditional expansion, rule composition, state machines, and joins (steps that depend on more than one step). You might not need this and the library needs work, so I’d recommend just modeling this sort of thing like Piper with some structs and a graph library like Libgraph.

I still have some more larger todo’s I’m trying to work on before Dagger is 0.1.0 and Hex ready.

If you’re modeling dataflow step dependencies in a DAG - it is important to separate the runtime execution to the functional model and to do that you want a form of lazy execution. Similar idea as the Mint client library of being “process-less” since the runtime has “it depends” scenarios to account for. So long as lazy evaluation is possible you can implement the runtime execution pieces with Broadway, Tasks, GenStage, and so on every which way you need.

1 Like

Thanks so much for the references and great talk!

I didn’t get much time to continue work on this and the ideas are still mostly just clunking around in my mind.

Also, think I got sidetracked by the DSL part (it’s way too much fun tinkering with what the user-facing API might look like :sweat_smile:). I need to let that sit and just start out with some manually defined structs, implement a simple step runner and go from there.

Not sure I exactly follow you on the lazy execution part, but yeah, the idea was to have process-less runners for the steps (step definition and options in → results/errors out) that could then be “wrapped” in whatever runtime structure is configured. The DAG itself is just an intermediate structure that is used to build up & configure the order / coordination of steps at runtime.