Data Pipeline Job Processing System with Ash: Seeking Advice

Hi Ash community,

I’m working on refactoring our data pipeline job processing system to better leverage Ash’s capabilities, and I’d love your input on the best approach. We’re aiming for a highly composable system where different jobs can flow nicely into one another. Here’s an overview of our current setup:

Current Architecture

  1. Our system is primarily a data pipeline, with different types of jobs that can be chained together. a few main job types are:
    • TwitterListScrapeJob – data scraping/insert job
    • TwitterSummarizationJob – data augmentation/cleaning
    • TwitterAnalysisJob (which combines both scraping and summarization)

We support other “sources” than twitter, but I think for the example you get the idea.

  1. These Jobs are implemented as oban workers. We have a “pipeline run” that schedules these Oban workers. Each Oban worker then calls an Ash reactor to perform the actual work. Here’s a simplified example:

    defmodule YourApp.Jobs.TwitterListScrapeJob do
      use Oban.Worker
      alias YourApp.Twitter.TweetReactor
    
      @impl Oban.Worker
      def perform(%Oban.Job{args: %{"list_id" => list_id}}) do
        # Call the reactor to perform the scraping
        case Reactor.run(ScrapeReactor, %{list_id: list_id}) do
          {:ok, tweets} ->
            broadcast_progress(job_id, :completed, 100, "Scraping completed")
            {:ok, tweets}
          {:error, reason} ->
            broadcast_progress(job_id, :failed, 0, "Scraping failed: #{reason}")
            {:error, reason}
        end
      end
    end
    
  2. The Ash reactor typically has multiple steps and results in Ash actions being performed, such as inserting new records into the database or updating existing records:

  step :fetch_list_details, App.ScrapeSteps.FetchListDetails do
    argument :list_id, input(:list_id)
  end

  step :fetch_list_members, App.ScrapeSteps.FetchListMembers do
    argument :list_id, input(:list_id)
  end

  step :fetch_user_details, App.ScrapeSteps.FetchUsersDetails do
    argument :user_ids, result(:fetch_list_members)
  end
..... etc
  1. We currently use Phoenix PubSub to broadcast job progress updates, so there’s calls like this all over the codebase:

    defp broadcast_progress(job_id, status, progress, message) do
      Phoenix.PubSub.broadcast(YourApp.PubSub, "job_progress:#{job_id}", {:job_progress, %{
        job_id: job_id,
        status: status,
        progress: progress,
        message: message
      }})
    end
    
  2. A Phoenix LiveView component subscribes to these updates and displays job progress.

What We’re Looking to Improve

  1. Make the system more declarative using Ash resources.
  2. Enhance observability and progress tracking throughout the pipeline.
  3. Better integrate our Oban workers with Ash.

Questions

  1. How would you recommend restructuring this data pipeline system using Ash resources to improve composability? I feel like currently most of the “business” logic lives in the oban jobs, and stitches together basic crud style ash actions.
  2. What’s the best way to handle job orchestration and chaining in our data pipeline using Ash? Should we consider using Ash.Reactor or is there a better approach for pipelines?
  3. How can we integrate Oban workers with Ash resources and reactors for background processing in a way that maintains the pipeline’s composability?
  4. What’s the recommended approach for real-time progress updates of pipeline stages using Ash? Should we stick with Phoenix PubSub or is there a more Ash-native way? I feel like I have dozens of broadcast_update any time the code path branches.
  5. Are there any Ash-specific patterns or features we should be aware of for handling long-running processes in a data pipeline?
  6. How can we improve the interaction between our Oban workers and Ash reactors to better suit a pipeline architecture? Is there a more idiomatic way to structure this in Ash?

Any insights, examples, or best practices you can share would be greatly appreciated!

And sorry for the long post, any advice is greatly appreciated, no matter how small :slight_smile:

TLDR;

Business logic is in “jobs”. Jobs call Oban worker. Oban calls a reactor, and reactor calls ash crud operations a bunch, how to make better integration of tools?

1 Like

A great line of questioning!

Before diving into the specific questions, I want to run something by you.

My first thought is to see whether or not leveraging Ash’s bulk actions directly to process “batches” of work can be the trick for you. It will require exploration, and I don’t know enough of what custom things are happening inside of your reactors to say for sure. This may be unfortunately too simple of a strategy depending on the failure characteristics you need, and granularity in terms of durability.

We should be able to add some callbacks to allow firing a callback after each batch (doesn’t exist now) that would allow you to report progress in an idiomatic way.

Lets say you have a really simple import action like this:

# with an action like this
create :import do
  upsert? true
  upsert_identity :some_unique_identifier
  ...
end

Stream.resource(...read paginated data source)
|> Stream.map(fn data_from_the_web -> 
  <...turn it into resource input, i.e `%{name: data_from_the_web["name"]}`
end)
|> Ash.bulk_create(:import_from_somewhere, :import, return_stream?: true, return_errors?: true, return_records?: true)
|> contiue_stream_processing() # will have errors & results together in the stream

Let’s say you had a change that would call out to the external service to expand on some information. The standard way of defining that might be something like:

defmodule FetchListMembers do
  use Ash.Resource.Change

  def change(changeset, _, _) do
    Ash.Changeset.before_transaction(changeset, fn changeset -> 
      data = ExternalApi.get_data(changeset)
      Ash.Changeset.set_context(changeset, %{list_members: data})
    end)
  end
end

But this can actually be expanded to support operating on batches of records. For example:

defmodule FetchListMembers do
  use Ash.Resource.Change

  # signify that you can operate on batches
  def batch_change(batch, _, _) do
    batch
  end

  def before_batch(batch, _, _) do
     data_by_id = ExternalApi.get_many_results(batch)
     Enum.map(batch, fn changeset -> 
       id = Ash.Changeset.get_attribute(changeset, :id)
       data = data_by_id[id]
       Ash.Changeset.set_context(changeset, %{list_members: data})
     end)
  end
end

And you can then drop this into your action like any change

create :import do
  change FetchListMembers
end

Now you have an action that can import a single thing directly, with Ash.create or you can efficiently do in bulk via Ash.bulk_create. Technically any action can be done in bulk like this, it’s just a matter of if it is optimized for being done in bulk.

So the main question here is if you’ve looked into bulk actions and/or evaluated them for the work that you’re doing.

1 Like

So I do use bulk actions, but again it’s just the default :create action that I’m sending them to. For example the Scrape reactor is an ETL pipeline where I make approximately 200-1000 API requests, and get their json data, do some cleaning up and formatting so that it looks the way my resources expect it to look, and then once I have a bunch of those, at the end of the reactor I call a big insert_data function that looks like this:


  def insert_data(parsed_data) do
    with {:ok, authors_result} <- bulk_create(App.Twitter.Author, parsed_data.authors),
         {:ok, urls_result} <- bulk_create(App.Twitter.URL, parsed_data.urls),
         {:ok, mentions_result} <- bulk_create(App.Twitter.Mention, parsed_data.mentions),
         {:ok, tweets_result} <- bulk_create(App.Twitter.Tweet, parsed_data.tweets),
         {:ok, list} <- create_list_with_authors(parsed_data.list) do
      {
        :ok,
        %{
          list: list,
          authors: authors_result,
          urls: urls_result,
          mentions: mentions_result,
          tweets: tweets_result
        }
      }
    else
      {:error, reason} ->
        {:error, reason}
    end
  end

  defp bulk_create(resource, items) do
    stream = Ash.bulk_create(items, resource, :create, bulk_create_opts(resource))

    result =
      Enum.reduce(stream, %{success_count: 0, error_count: 0}, fn
        {:ok, _record}, acc ->
          Map.update!(acc, :success_count, &(&1 + 1))

        {:error, _error}, acc ->
          Map.update!(acc, :error_count, &(&1 + 1))

        _, acc ->
          acc
      end)

    {:ok, Map.put(result, :status, get_status(result))}
  end

where bulk_create_opts is really just a config object that has some defaults, and then the relevant upsert_fields for each resource.

I like the look of using something like the FetchListMembers module you mocked up there, and just demonstrates to me I haven’t fully internalized how exactly I can be using Ash, as I haven’t touched any of the `before/after_transaction/action’ hooks, or anything like set_context.

And I don’t really have many sophisticated failure modes or the need to recover them. The reactors basically have no use of the rollback/redo implementation. It’s been more of a way to structure the code so that it runs async where possible and I can easily add/remove dependencies, and see at what step a particular error occurred.

1 Like

Yeah, the sync nature of reactor is something you can’t really get out of Ash hooks at the moment, although you can process batches concurrently using max_concurrency. So you could have a set of preprocessors that happen sequentially per batch (because they use changes with batch callbacks) and batches processed concurrently.

1 Like

So it sounds like Reactor probably a good choice given my workflow.

Does it sound like anything I’ve said is particularly out of the ordinary? My main thoughts around making this post was just that when I think of ratcheting on a new piece of functionality into the current system, I’m probably going to create another “job”

That job will most likely do all of it’s work in an Oban worker, and that will make whatever external API calls that need to be made, transform and augment the data, report on progress, etc.

Then at the very end it will be some crud operation given to an ash resource, which had me suspecting that I may not be leveraging ash as idiomatically as I otherwise could?

I don’t think there’s anything inherently wrong with my approach, but after watching some of your talks (thanks btw, those are great!) and reading more through the docs I thought I’d pop in here for some other’s opinions!

I wouldn’t say there is anything terribly wrong, ultimately you have two systems. You have an importer, which gets to use a data-first implementation of your domain.

You could potentially encapsulate the running of a job into a generic action, i.e

action :import, :return_type do
  argument :argument, :type, ...
  run YourActionImplementation
end

which would let you house the import logic in the resource. It’s a marginal gain, you get some type safety and the ability to trigger an import over APIs and/or write policies about triggering imports. I’m not fully convinced you really need any of that though.

1 Like