How to stop/restart a GenStage pipeline

I have an app running GenStage that use a producer stage to buffer/rate-limit work that we manually send to it. However, sometimes we make mistakes and we might want to stop the music and reset the particular producer stage that has accumulated all the unwanted messages. I’m discovering that this is trickier than I thought. Using Process.exit(my_buffer_pid, :normal) returned true, but when I inspected the buffer via GenStage.estimate_buffered_count/2, all the messages were still there.

Is there a better way to do this? Like…maybe I should be flushing the messages instead of attempting a restart? Or maybe GenStage.stop/3 is the way to go?

Thanks as always for any pointers!

1 Like

I have not used GenStage in a while. Can you show a more detailed example though?