GenStage ProducerConsumerSupervisor?

We have ConsumerSupervisor which starts 1 child per event. Is it possible to use this as a :producer_consumer ?

[A] -> [B(1, 2, 3, 4, 5, …)] -> [C]

Effectively what I’m after is being able to get B to subscribe to A and C to subscribe to B. Then manage concurrency of B just with :max_demand. As opposed to starting an arbitrary number of regular :producer_consumer stages for B and manually handling subscriptions of C to each B.

The example of using a ConsumerSupervisor in the gen_stage repo returns a Supervisor style term in it’s init/1 but the docs show a regular GenStage style return value where you specify :producer, :producer_consumer or :consumer: https://hexdocs.pm/gen_stage/ConsumerSupervisor.html#init/1

So I’m thinking this might be possible but I’m not understanding how to set it up.

Based on my GenStage experiments and reading the ConsumerSupervisor documentation here are my thoughts:

  • ConsumerSupervisor is a Supervisor process which creates a child process for each event that it receives. In general Supervisors are designed to be as simple as possible so that they can focus on one thing: “supervising child processes” (while not taking on any additional responsibility which may cause them to crash) - this one just happens to dynamically create children for any events received. By this nature the ConsumerSupervisor can only really be a Consumer in a GenStage scenario as a supervisor typically doesn’t collect and manage results from its children.

  • Your particular [A] -> [B(1, 2, 3, 4, 5, ...)] -> [C] scenario could be realized with C as a Producer (if there is a (producer-) consumer D). Essentially the children of the ConsumerSupervisor would simply deliver their result to C via GenStage.cast/2 or GenStage.call/3 - i.e. flow to C would be entirely governed by the :min_demand, :max_demand settings on B, the ConsumerProducer.

  • The documentation of init is strange to say the least. There is a ConsumerSupervisor.init/1 callback that is analogous to Supervisor.init/1 - so I would expect ConsumerSupervisor.init/1 function to be analogous to Supervisor.init/2. The text of the ConsumerSupervisor.init/2 function seems very similar to the text of the Genstage.init/2 callback - so I suspect a copy/paste error. The ConsumerSupervisor.init/1 function code simply prepares a tuple containing the supervisor flags and child specifications.

1 Like

Many thanks for the reply @peerreynders. Your first point makes perfect sense, and matches how the example works and the description of ConsumerSupervisor in the docs. I think there’s a mistake in init/1 in the docs then.

The second point I’m not sure I understand though. C can’t possibly be a :producer because it’s supposed to receive events from B (B being a single :producer_consumer stage or an array of :producer_consumer stages). Unless I’m totally misunderstanding you!

  • C would only be a Producer if there are other stages that follow - in your simple A,B,C scenario a GenServer at C would suffice.
  • A Producer has to get its events from somewhere. Some are read from a file, others arrive via GenStage.handle_cast/2, GenStage.handle_call/3, GenStage.handle_info/2, etc. The difference is that the arrival of these events isn’t regulated.
  • So as such “flow-control” can’t really propagate from C to B. C has to take what B is dishing out based on B’s :min_demand/:max_demand configuration.

Ok so I think what you’re suggesting is to make B a ConsumerSupervisor that sends events to C via cast/2 / call/3. Where C is just a regular GenServer outside of the GenStage structure:

[A] -> [B] ..cast/call.. [C]

In this way C wouldn’t be able to provide back pressure.

That would almost work for me, but (you guessed it) C needs to provide some back pressure, which is part of the reason why I’m using gen_stage in the first place :slight_smile:

I’m starting to look into Flow which seems to be able to this kind of thing out of the box, but sacrificing the flexibility of having full fledged GenStages. It seems like if Flow can do it, then it should be possible to implement the same thing with what Flow is built on top of - GenStage. Maybe I need to dig into the Flow source a bit…

In a sense ConsumerSupervisor + Producer “is a Consumer Producer” where :min_demand/max_demand provides the back pressure regulation for the Producer in front of it and the (B) Producer is regulated by the Consumer behind it. So you could fashion something this way:

A(Producer) -> (Bc(ConsumerSupervisor) ~cast/call~> Bp(Producer)) -> C(Consumer)

  • Bc Consumer portion of B
  • Bp Producer portion of B
4 Likes

Hmm, nice! Thank you @peerreynders

Hi,

Are you saying that the consumer supervisor Bc would trigger a task that does some work then sends a message to a producer (Bp) to queue up additional work that consumer C would then do more work on?

I’m not quite sure what exactly you are asking. (Bp) was only introduced because of the statement

In this way C wouldn’t be able to provide back pressure.

So I assume that C was going to do some further work on the results that where generated by the tasks spawned by (Bc). Now whether results have to be queued at (Bp) is entirely dependent on how demand is handled within (Bc/Bp). That part of the solution was never discussed - for a regular producer-consumer:

This means it does not explicitly handle the demand because the demand is always forwarded to its producer.

i.e. there is code in the GenStage behaviour that automatically forwards demand from the final consumer of the pipeline to the producer at the beginning of the pipeline - that forwarding mechanism would need to be manually added to (Bc/Bp). Bp could receive demand via handle_demand/2 (responding with an empty list of events), while “informing” Bc of the demand.

I suppose Bc could handle subscription manually and use GenStage.ask/3 to get the demand to A once Bc actually “knows” what the demand is. At this point it’s A’s responsibility to provide no more than is demanded (and store demand if it can’t provide it all).

If back pressure is propagated in this manner there shouldn’t be a need to queue any results at Bp - it can simply release any result to C immediately via a {:noreply, [result], state} tuple (see for example handle_cast/2) as there is no actual obligation to “batch” the events. Bp would then primarily exist to receive demand from C and to forward it to Bc.

Oh, I’m sorry, I think I misunderstood your earlier diagram.

Here’s what I’m looking for, which might be misguided (I’m still very much a beginner to the GenStage world):

Producer A has events
ConsumerSupervisor B requests the events from A and spawns Task workers up to max_demand.
The work from B’s workers can be picked up by another ConsumerSupervisor, C

I’m not quite sure how to connect B and C, and I thought that your diagram would allow me to do so. Is this what you were talking about? Now I’m getting the impression that you aren’t, and that I’m just not understanding what you’re saying (the ocean of GenStage is vast and I am just getting my sea legs).

I’m by no means a GenStage expert - I have just happened to play around with it for a few days 4 months ago - and looking at the documentation right now I still find it a bit hard to digest.

For example it’s only when I started to write the code that I noticed that any of the callbacks that can return a tuple that includes [events] can release events to the next stage - skimming through the examples in the documentation you could easily be left with the impression that handle_demand/2 is the place where events are released - when in fact it is only one place where events are released.

If you have events during the handle_demand/2 callback then by all means release what you have that doesn’t exceed the demand. But as a producer any unfulfilled demand has to be “stored” - because the consumer is only going to issue another “demand” if it wants more than it already asked for.

So whenever a producer “acquires events” it can release them immediately provided it has “stored demand”.

So there are situations where handle_demand/2 will simply return an empty event list (because there are simply none available at the time) - and the events are only released later when the producer somehow gets ahold of them - and then they are released as a result of the callback that “delivers” the events to the producer.

Now a consumer is a GenStage that is at very end of the pipeline and is ultimately considered to be the “constraining operation” - that is why it gets to set the demand that propagates up all the way to the producer at the beginning of the pipeline (who has to honour that constraint). So ConsumerSupervisor is inherently designed to be at the end of that pipeline.

My guess is that it doesn’t need to be a producer because conceptually as it is the “constraining operation” back pressure isn’t an issue with any processing stages that follow - they can simply be implemented as GenServers because they can deal with any volume that the ConsumerSupervisor is capable of throwing at them.

Which brings me to the pertinent point - why would you think that you need ConsumerSupervisor feeding another ConsumerSupervisor? It’s a necessary question to ensure that we aren’t running into the XY problem - i.e. there may be a solution to your actual problem that has nothing to do with ConsumerSupervisors.

1 Like

Okay, so my actual problem is that I am receiving webhooks. Each of those webhooks needs to go through a series of steps, a couple of HTTP requests, etc. Right now I just have those webhooks dumped to a directory, a producer produces them, a consumersupervisor spawns a task for each webhook it receives.

Unfortunately some of it is a little dirty because I get more files I have to manage, and it seemed like one new “workflow step” per new file might make sense and keep the code as readable as possible.

And this is where I am.

Hopefully you’ll bear with me because I’m not entirely sure I’m grasping all the essential details from your description - so I’m just going to describe what I think you are telling me - with the full expectation that you are going have to correct me.

High level goal

Some kind of “domain event” occurs and zero to many URIs need to be notified (my understanding of webhooks).

Complication

The domain event doesn’t actually include all the information necessary for the notifications to be posted. The supplementary information needs to be fetched from multiple sources - preferably concurrently. (I’m assuming that the supplementary information depends on information from the domain event rather the URI the notification is posted to).

Observations

  • I suspect that currently the producer essentially just combines the “domain event” with the collection of URIs that need to be notified - i.e. the “event” emitted by the producer is “domain event + URIs that need to be notified”.
  • From the description it doesn’t seem like GenStage’s back pressure regulation is an essential element in solving the problem - the primary motivation for using GenStage is 1) to cleanly separate/segregate all the steps necessary to turn “Domain Event + notification URIs” into “notifications being sent to all the URIs”, 2) have these steps in the pipeline potentially working progressively but concurrently on distinct instances of domain events (each with their own set of URIs).

Now at this point I suspect that you are trying to insert a stage that is responsible for “concurrently fetching supplementary information for one single domain event (on behalf of all specified notification URIs)” - and this is where the disconnect happens for me for attempting to employ a ConsumerSupervisor for this responsibility:

  • The ConsumerSupervisor is designed tostart a (i.e. one) new child process per event” and it seems to me that for concurrent fetches you need to “start multiple new child processes per event”.
  • By design the ConsumerSupervisor never had to combine the results from the various child processes - however the results of the concurrent fetches need to be aggregated back into the original domain event data.

So where are my misunderstandings?

Hi there, thanks for taking the time!

My high-level goal is: “I receive notifications all the time and I need to do work on each of those notifications before I can ingest them. This work requires a number of HTTP calls to be made”.

Currently as you have stated I do not need backpressure. In fact, most of the time, I do not want to process these events as batches, I want to process them as they arrive, hence the appeal of the consumer supervisor: it automatically creates one worker per event.

Frankly, as I chat with you, I realize that I’m trying to make the problem more complex than it needs to be right now, I just need to make a small change to what I currently have to be much happier with the way it works.

We’ll probably want some backpressure for some things as the influx grows, but there’s a number of things that we’ll just want to be able to send through very quickly anyway.

Well, thank you for helping me think about things and convincing me to not try and bend GenStage out of shape :slight_smile:

###Rubber Ducking
A very simple but particularly useful technique for finding the cause of a problem is simply to explain it to someone else. The other person should look over your shoulder at the screen, and nod his or her head constantly (like a rubber duck bobbing up and down in a bathtub). They do not need to say a word; the simple act of explaining, step by step, what the code is supposed to do often causes the problem to leap off the screen and announce itself.‡

It sounds simple, but in explaining the problem to another person you must explicitly state things that you may take for granted when going through the code yourself. By having to verbalize some of these assumptions, you may suddenly gain new insight into the problem.

‡. Why “rubber ducking”? While an undergraduate at Imperial College in London, Dave did a lot of work with a research assistant named Greg Pugh, one of the best developers Dave has known. For several months Greg carried around a small yellow rubber duck, which he’d place on his terminal while coding. It was a while before Dave had the courage to ask…

p. 95 - The Pragmatic Programmer