Bundling up my genstage library for easy use in people's apps

Hi folks,

I’ve written an RSS spidering library that uses 6 genstages to create a work pipeline. I’m enjoying seeing it work in the console, but it’s time to try it out in a phoenix app.

I’d like to make it easy for people to use. Possibly people will want to customise demand settings, or only use a subset of my stages, but I think most commonly people will want to use all 6 stages with default settings, which will provide them with a nice stream of all new articles that they can subscribe their own consumer to. As such a user, I’d expect to just add one application to my supervision tree and have it all working.

So, I’m wondering how to do this. I can create an OTP module that fires up the whole pipeline, but the user will still need to attach their own consumer. I’m wondering if I should create a Genstage :producer that actually wraps my entire pipeline, so people can consider it as a single stage, even though internally it is actually six. Can I do that?

The code required to initialise my stages is as follows. The is essentially the boilerplate that I’m trying to eliminate for calling apps without custom needs.

{:ok, feed_resource_urls} = FeedStage.Stages.FeedResourceUrls.start_link(url_repository)
{:ok, fetch_resources} = FeedStage.Stages.FetchResources.start_link
{:ok, parse_feeds} = FeedStage.Stages.ParseFeeds.start_link
{:ok, all_articles} = FeedStage.Stages.AllArticles.start_link()
{:ok, new_articles} = FeedStage.Stages.NewArticles.start_link(article_repository)
{:ok, fetch_metadata} = FeedStage.Stages.FetchMetadata.start_link()

GenStage.sync_subscribe(fetch_resources, to: feed_resource_urls, min_demand: 1, max_demand: 10)
GenStage.sync_subscribe(parse_feeds, to: fetch_resources, min_demand: 1, max_demand: 10)
GenStage.sync_subscribe(all_articles, to: parse_feeds, min_demand: 10, max_demand: 50)
GenStage.sync_subscribe(new_articles, to: all_articles, min_demand: 10, max_demand: 50)
GenStage.sync_subscribe(fetch_metadata, to: new_articles, min_demand: 10, max_demand: 50)

# This is an example of a consumer, this inspector just dumps the results to STDOUT
{:ok, inspector} = InspectingConsumer.start_link
GenStage.sync_subscribe(inspector, to: fetch_metadata, min_demand: 1, max_demand: 2)

Full code is here, but I’m yet to write documentation until I make it more user friendly.

1 Like

I’m currently doing a similar thing in a payment gateway library I am working on which you can see here (make sure you’re looking at the design_refactor branch).

The root module (cashier) uses the application behavior so it gets automatically started, at which point it starts up the pipeline through a supervisor (pipeline_supervisor). From here all of the stages wire themselves up to the parent producer through the init callback (see gateway_router for an example).

This way there isn’t any manual starting of stages by the app using the library and the fact genstage is involved is transparent (unless you look into it of course :wink:) .

2 Likes