GenStage: Switching producer subscriptions dynamically


I have a system that processes realtime events. These events are cached before being processed by a GenStage pipeline.

I have two producers.

Collector: Receives realtime events and dispatches them to consumers as is.
Historian: Fetches all previously unprocessed events and feeds them to the consumers

The idea is that initially when the pipeline is starting, all the consumers subscribe to Historian to fetch all cached events that hasn’t been processed yet. In the meantime, Collector will start as well and begin buffering new, incoming events while the consumers are processing the old, cached events.

Once the historian has emited all the cached events and the consumers have processed them, I’d like for the consumers to switch their subscription from Historian, to the Collector, to begin processing of the events that happened in the meantime.

In ideas for how best to accomplish this? I was thinking something along the lines of the Historian dispatching a ‘done’ event once it has emitted all its events. Then, in the consumers, once this event is received, they’ll dynamically subsctribe to Collector using GenStage.sync_subscribe, although I’m not sure if there’s an API available for this.

Is this a good approach?