How to make GenServers (or other OTP behaviors like GenServers) become GenStages?

I regularly have GenServers, or other OTP behaviors like GenServers. I also often want those to become GenStages.

It doesn’t seem to be trivial to extend an actor to interact within the GenStage specification, and I was wondering if anyone had a reasonable approach to this problem.

I realize that this might not have a reasonable solution, outside of using the facilities available to forward the events/whatever to another process, but I figured it wouldn’t hurt to ask.


It sounds like you want to be able to use those GenServers in two ways: As regular GenServers and GenStages.
In this case, wouldn’t it be quite convenient to create a thin wrapper module which is “just” a GenStage and calls out to your GenServer in order to cleanly separate those concerns?

That said, GenStage is actually implemented on top of GenServer, so any GenStage is also a GenServer:

GenStage is implemented on top of a GenServer with a few additions.
Besides exposing all of the GenServer callbacks, it also provides
c:handle_demand/2 to be implemented by producers and c:handle_events/3 to be
implemented by consumers, as shown above, as well as subscription-related
callbacks. Furthermore, all the callback responses have been modified to
potentially emit events. See the callbacks documentation for more

The biggest problem for me is the things that already are a “subclass” of GenServer.

Imagine you were using any of the websocket libraries available currently. They also are kind of like GenServer, like GenStage is.

If I wanted a module that obeyed both sets of message specifications, I would have to copy default implementations of at least one to my module.

Look at the specification for implementing GenStage’s protocol, not its callbacks, but the raw messages, and you might agree that it is quite difficult to compose behaviours with it.

Also, no matter how cleanly you separate the concerns of a GenServer and a GenStage, a server supplying a producer stage is going to look like a poor implementation of a producer subscribed to by a producer consumer. I have tried it. I don’t prefer it.

Without any examples on what are you trying to do, this is only gonna be a theoretical discussion I am afraid.

An example: If I wanted to take a module that implements the Connection behavior from the connection package, and adapt it to also handle messages from a GenStage, how would I do that?

What do you mean by that? Not sure I understand. GenStage requires you to create consumer / producer / producer-consumer so you should just code what you need in a GenStage instances which makes use of the Connection?

Implementing this message specification.

The level of detail to appropriately implement this specification is ridiculous. Creating modules and dividing the responsibilities across them would be a way to mitigate this complexity.

No then imagine that there are some types of modules like this that receive a lot of streaming data. And while streaming can also send commands to an underlying API. All of which should be occurring in the same process.

I wonder if some of this work can be done with a Dispatcher, but the documentation isn’t very clear on how to use a dispatcher outside of the GenStage standard flow.