A GenStage allowing me to reproduce all events send so far

Hi,

I am trying to build a gen stage that buffers incoming data, and sends it all to any new subscribed consumer.

Background: The GenStage buffers a file, that is created on the fly and the Consumers serve this file to clients in a Phoenix application

In other words: Whenever a new consumer is subscribed, all Events that have been send by this GenStage so far are “resend” to this consumer (but of course not to any existing consumer).

What I got so far:

  • Of course I can simply buffer all Events send by the GenStage
  • I subscribe new consumers by an explicit function. This function takes care of sending the events buffered explicitly to the consumer.

But here are my questions/problems:

  • The GenStage documentation says, that if no consumers are subscribed, events are buffered. When consumers are subscribed, they are not. This means that I cannot rely on some events, hat are in my self created buffer are send the normal way or not. Can I somehow completely disable the buffer behavior of GenStage?
  • When I say, that I send the events “explicitly” to the consumer, that means the consumer has to handle the buffered events in some other way. Is it possible to send the buffered events the normal GenStage way, but only sending it to this one (new consumer)?

Thank you!

I don’t think GenStage is a great tool for your multiple distinct consumers setup you’re describing. GenStage doesn’t care which consumer handles an event and it doesn’t provide any routing for that. Also handle_demand does only provide an integer of how much demand exists from consumers. It doesn’t inform the producer about which consumer sent the demand.

You could use separate GenStage pipelines per distinct consumer though, where the producers pull their events from a shared resource holding the file contents.

I think you can try to accumulate the events somewhere else(ets table, gen_server, external DB, whatever) and in each new phoenix connection read that thing :slight_smile: .

Hey,
Thanks for your suggestion, your setup makes sense, but the “read that thing” is the part I have trouble with. Because I have 2 situations I have to take care of:

A client connects and …

  1. The file has already been fully generated. That is easy, I just read it and than serve it.
  2. The file has not been fully generated. Than I have to serve the data generated so far (easy) and somehow stream the rest when it will be generated in the future.

Point 2 is the one I have trouble with. Maybe you have a suggestion for this?

Thank you!

Hey,

Thanks for your reply! The trouble I have when implementing this the “pull” part.
How would you pull form the shared resource, if the data is not yet there and has to be waited for?

The best Idea I have is calling some “pull” function on a regular basis, which sounds horrible inefficient.

You could “register” with the shared system to receive initial state (what it knows about now) and for it sending any later additions as messages.

1 Like

This sounds like a solution to me :slight_smile:

ok, I will try it out :). Thank you guys for your help and time!