For the purposes of this conversation, let’s say I’m building a bot that will delete all messages in a given Slack channel.
The high-level procedure to do so is as follows:
- List all
conversationsin a given room.
- For each conversation, list all
repliesto that conversation (if any).
- For each message (either a conversation or a reply), issue a delete request
I can model my pipeline fairly well as a 2-stage pipeline:
ConversationProducer ----> Processor
/conversations, produces a message for each conversation. Stores the
- The Processor will first fetch replies to that conversation, if any. Then, it issues a delete request for the conversation, then for each reply.
ConversationProducerpolls the next page of
/conversations, using the
cursorin its state.
This works fine, but now Broadway’s rate limiting construct is not too useful. If a conversation has 100 replies, the Processor will work each of them without limit.
I think this is ideally modeled as a 3-stage pipeline, with a ConsumerProducer:
ConversationProducer ----> SlackMessageProducer ----> DeleterProcessor
Where the output of Stage 1 is a message with a conversation’s ID. SlackMessageProducer produces a stream of IDs of all messages by fanning out to grab each conversation’s replies. DeleterProcessor works with single slack messages to delete them.
With this, I can easily hard-code values for rate limiting into ConversationProducer and SlackMessageProducer and get very close to optimal.
I know how to model this with GenStage, but I’m having difficulty finding an example of something like this for Broadway. I have two ideas:
- Fuse together two Broadway pipelines
- Plug a Producer/ConsumerProducer pair into a Broadway pipeline. Then I just manually wire up the ceremony of propagating eg acks up the chain
Any guidance appreciated! Thanks in advanced