For the purposes of this conversation, let’s say I’m building a bot that will delete all messages in a given Slack channel.
The high-level procedure to do so is as follows:
- List all
conversations
in a given room. - For each conversation, list all
replies
to that conversation (if any). - For each message (either a conversation or a reply), issue a delete request
I can model my pipeline fairly well as a 2-stage pipeline:
ConversationProducer ----> Processor
-
ConversationProducer
polls/conversations
, produces a message for each conversation. Stores thecursor
in state. - The Processor will first fetch replies to that conversation, if any. Then, it issues a delete request for the conversation, then for each reply.
- When
handle_demand/2
is invoked,ConversationProducer
polls the next page of/conversations
, using thecursor
in its state.
This works fine, but now Broadway’s rate limiting construct is not too useful. If a conversation has 100 replies, the Processor will work each of them without limit.
I think this is ideally modeled as a 3-stage pipeline, with a ConsumerProducer:
ConversationProducer ----> SlackMessageProducer ----> DeleterProcessor
Where the output of Stage 1 is a message with a conversation’s ID. SlackMessageProducer produces a stream of IDs of all messages by fanning out to grab each conversation’s replies. DeleterProcessor works with single slack messages to delete them.
With this, I can easily hard-code values for rate limiting into ConversationProducer and SlackMessageProducer and get very close to optimal.
I know how to model this with GenStage, but I’m having difficulty finding an example of something like this for Broadway. I have two ideas:
- Fuse together two Broadway pipelines
- Plug a Producer/ConsumerProducer pair into a Broadway pipeline. Then I just manually wire up the ceremony of propagating eg acks up the chain
Any guidance appreciated! Thanks in advanced