Broadway’s straightforward backpressure mechanism of producing new messages only as there is enough demand to consume them works very well for external event queues like RabbitMQ, Kafka, or databases, but I’ve run into an interesting limitation, trying to write a producer module for a push-only data source.
To make my particular case concrete, this is a server-sent event endpoint (ironically wrapping a private Kafka stream, that I can’t access). This source is a fire hose of events which goes on until the client disconnects. The interface includes a crude cursor which allows for reconnection starting at a given event ID and so supports at-least-once consumption even in the case of disconnect or other failure, although without any particularly nice guarantees.
In theory it should be possible to write a Broadway producer for this stream, which adapts from the push interface to a demand-driven one. If backpressure is too high, messages can be queued, and as a last resort the module will intentionally disconnect. The connection can be retried after demand recovers, or to restart after other failures. My rough code without reconnect logic isn’t much to write home about so far. The previous incarnation is a GenStage pipeline, but lacks backpressure.
A simpler approach would be to read continuously from the event source into a buffer and then connect a broadway producer to the buffer, but it would be a pity to lose the backpressure. Disconnecting the event source is nice because it causes a push source to convincingly emulate some of the important characteristics of a modern source, like allowing the integrator to choose the upper bound on storage and processor use.
What I’ve found is that Broadway and GenStage seem very much not built for this use case of throttling a push source, or more likely I must be making some basic mistake in attempting to wire a bad old push source to fancy new machinery? So far, what I’m imagining is that disconnection should be triggered by a combination of demand underrun and the internal buffer reaching a desired maximum. Maybe this logic could plug in like a rate limiter, or GenStage’s internal buffer would include a warning threshold before hitting buffer_size, and send a signal rather than just logging errors, or the built-in ProducerStage would signal the custom producer module on demand underrun…