Hi Ash community,
I’m working on refactoring our data pipeline job processing system to better leverage Ash’s capabilities, and I’d love your input on the best approach. We’re aiming for a highly composable system where different jobs can flow nicely into one another. Here’s an overview of our current setup:
Current Architecture
- Our system is primarily a data pipeline, with different types of jobs that can be chained together. a few main job types are:
TwitterListScrapeJob
– data scraping/insert jobTwitterSummarizationJob
– data augmentation/cleaningTwitterAnalysisJob
(which combines both scraping and summarization)
We support other “sources” than twitter, but I think for the example you get the idea.
-
These Jobs are implemented as oban workers. We have a “pipeline run” that schedules these Oban workers. Each Oban worker then calls an Ash reactor to perform the actual work. Here’s a simplified example:
defmodule YourApp.Jobs.TwitterListScrapeJob do use Oban.Worker alias YourApp.Twitter.TweetReactor @impl Oban.Worker def perform(%Oban.Job{args: %{"list_id" => list_id}}) do # Call the reactor to perform the scraping case Reactor.run(ScrapeReactor, %{list_id: list_id}) do {:ok, tweets} -> broadcast_progress(job_id, :completed, 100, "Scraping completed") {:ok, tweets} {:error, reason} -> broadcast_progress(job_id, :failed, 0, "Scraping failed: #{reason}") {:error, reason} end end end
-
The Ash reactor typically has multiple steps and results in Ash actions being performed, such as inserting new records into the database or updating existing records:
step :fetch_list_details, App.ScrapeSteps.FetchListDetails do
argument :list_id, input(:list_id)
end
step :fetch_list_members, App.ScrapeSteps.FetchListMembers do
argument :list_id, input(:list_id)
end
step :fetch_user_details, App.ScrapeSteps.FetchUsersDetails do
argument :user_ids, result(:fetch_list_members)
end
..... etc
-
We currently use Phoenix PubSub to broadcast job progress updates, so there’s calls like this all over the codebase:
defp broadcast_progress(job_id, status, progress, message) do Phoenix.PubSub.broadcast(YourApp.PubSub, "job_progress:#{job_id}", {:job_progress, %{ job_id: job_id, status: status, progress: progress, message: message }}) end
-
A Phoenix LiveView component subscribes to these updates and displays job progress.
What We’re Looking to Improve
- Make the system more declarative using Ash resources.
- Enhance observability and progress tracking throughout the pipeline.
- Better integrate our Oban workers with Ash.
Questions
- How would you recommend restructuring this data pipeline system using Ash resources to improve composability? I feel like currently most of the “business” logic lives in the oban jobs, and stitches together basic crud style ash actions.
- What’s the best way to handle job orchestration and chaining in our data pipeline using Ash? Should we consider using Ash.Reactor or is there a better approach for pipelines?
- How can we integrate Oban workers with Ash resources and reactors for background processing in a way that maintains the pipeline’s composability?
- What’s the recommended approach for real-time progress updates of pipeline stages using Ash? Should we stick with Phoenix PubSub or is there a more Ash-native way? I feel like I have dozens of
broadcast_update
any time the code path branches. - Are there any Ash-specific patterns or features we should be aware of for handling long-running processes in a data pipeline?
- How can we improve the interaction between our Oban workers and Ash reactors to better suit a pipeline architecture? Is there a more idiomatic way to structure this in Ash?
Any insights, examples, or best practices you can share would be greatly appreciated!
And sorry for the long post, any advice is greatly appreciated, no matter how small
TLDR;
Business logic is in “jobs”. Jobs call Oban worker. Oban calls a reactor, and reactor calls ash crud operations a bunch, how to make better integration of tools?