GenStage code architecture


I’m currently working on a real-time flow monitoring project but I have several questions about my architecture and the use of GenStage.


Currently, I need 4 “processing steps”. My simplified workflow looks like this:

            --> [CX] -+-> [DX]
[A] -> [B] -+-> [CY] -+-> [DY]
            --> [CZ] -+-> [DZ]

Here, A, B, C and D represent a step in the processing of my data.

X, Y, and Z represent a type of processing step.
In order to clarify what I call “type” for C and D, these two processing steps are divided into sub-steps that can be processed independently of each other.

In my current architecture A, B, CX..Y and DX..Y are GenStage. B and CX..Y are each used in BroadcastDispatcher in order to send the events to all their consumers.

To summarize: A gets the data and sends it to B, B sends it to each type of C and each type of C sends the data to each type of D.


I am currently asking myself several questions and I need an outside opinion because I am stuck on certain points.

How to replicate C and D?

Indeed, I’m starting to realize that the processing time of some types can sometimes reach several minutes which is blocking my flow of events during this time.

My main problem here is that B and C broadcast their events to ALL subscribers. Let’s imagine that I have 2 types of processing for C that we’ll call X, Y.

So that gives us the following scheme:

[A] -> [B] -+-> [CX] -+-> [D]
            --> [CY] -+

Now, let’s say I duplicate processing type X because I’ve noticed that sometimes this type takes too long to perform. So I’m going to end up with this:

            --> [CX1] -+
[A] -> [B] -+-> [CX2] -+-> [D]
            --> [CY]  -+

Here the problem is that my type X will receive the data twice and I find myself with the same problem as at the beginning.
What I’m looking for is something where for each type, a consumer who is free will process the data (here, CX1 is busy):

                [CX1] -+
[A] -> [B] -+-> [CX2] -+-> [D]
            --> [CY]  -+

I think I probably have the wrong way to use GenStage so I’d like to get your opinion.

Thanks in advance.

Looks like a perfect use-case for poolboy.

Implement one single C process that receives processing request and wrap D executors into poolboy.transaction/2.

Another approach would be to implement C as a DynamicSupervisor and spawn new supervised children for each request.

1 Like

If You need to partition work, You might also have a look at Flow.

First of all, thank you for your answer.

I had also thought of using poolboy but I couldn’t see how to use it without “breaking” my GenStage chain.

Currently, A, B, CX..Z and DX..Z are all GenStage.

If I understood correctly what you’re telling me, in solution 1, A, B and C would be GenStage and D would be a process pool notified by C.
But in this case, the problem remains the same, doesn’t it? If C takes 20 mins to run, D won’t receive any data during that time.

As for the second solution, I’ll have 2 GenStage and 2 process groups or GenServer, right?

Thank you also for your answer.
I’ve never used Flow before. I’ll read the documentation and get back to you

Indeed, if C is who runs 20 mins, you are to break your GenStage chain before C. GenStage and poolboy don’t play well together, so I would introduce BC Consumer that would terminate GenStage chain after B and delegate to poolboy for C.

I doubt Flow would help much in this scenario, because the existence of GenStage implies you have all your cores busy with C processes, haven’t you?

Isn’t C equal to C[X…Y]? In that case B could dispatch to the given C.

I actually used processes to model my processing, yes.

It is indeed the same and B could dispatch to C.

I was wondering: would it be possible to have one GenStage per type (C[X..Y])?
This GenStage, once it receives a request (handle_demand/2), it assigns the processing to another process (Task or poolboy), meanwhile, the callback handle_demand/2 return {:noreply, [], state} so that the GenStage will be able to continue to listen to its producers and as soon as the worker has finished his task, it will notify (GenStage. cast/2) the GenStage so that the GenStage can send its response to the consumer.