Multi-stage pipeline in Broadway

For the purposes of this conversation, let’s say I’m building a bot that will delete all messages in a given Slack channel.

The high-level procedure to do so is as follows:

  1. List all conversations in a given room.
  2. For each conversation, list all replies to that conversation (if any).
  3. For each message (either a conversation or a reply), issue a delete request

I can model my pipeline fairly well as a 2-stage pipeline:

ConversationProducer ----> Processor
  1. ConversationProducer polls /conversations, produces a message for each conversation. Stores the cursor in state.
  2. The Processor will first fetch replies to that conversation, if any. Then, it issues a delete request for the conversation, then for each reply.
  3. When handle_demand/2 is invoked, ConversationProducer polls the next page of /conversations, using the cursor in its state.

This works fine, but now Broadway’s rate limiting construct is not too useful. If a conversation has 100 replies, the Processor will work each of them without limit.

I think this is ideally modeled as a 3-stage pipeline, with a ConsumerProducer:

ConversationProducer ----> SlackMessageProducer ----> DeleterProcessor

Where the output of Stage 1 is a message with a conversation’s ID. SlackMessageProducer produces a stream of IDs of all messages by fanning out to grab each conversation’s replies. DeleterProcessor works with single slack messages to delete them.

With this, I can easily hard-code values for rate limiting into ConversationProducer and SlackMessageProducer and get very close to optimal.

I know how to model this with GenStage, but I’m having difficulty finding an example of something like this for Broadway. I have two ideas:

  1. Fuse together two Broadway pipelines
  2. Plug a Producer/ConsumerProducer pair into a Broadway pipeline. Then I just manually wire up the ceremony of propagating eg acks up the chain

Any guidance appreciated! Thanks in advanced