Flow window that triggers on inactivity

I am refactoring a time consuming part of our application using Flow. In my use case, I need a Flow pipeline in which the Flow.reduce step should be triggered when there are no incoming events for more than a specified duration of time. For example, prepare and update a report based on existing events when there are no new events for 5 minutes. Can this be done using Flow?

PS: I read about Flow.Window.session, but in session based windows, it looked like a new event has to arrive after the inactive period in order to identify a gap, which doesn’t seem to solve my problem.


I’ve solved this problem. Here’s how I did it:

Whenever an event is pushed into the producer, start a timer using Process.send_after(self(), :trigger, @inactivity_time_ms). Add the timer to the state of the producer so that we cancel the existing timer and start a new one every time we push a new event. Use handle_info to handle the message that will be sent from the timer, and inside it, use GenStage.async_notify(self(), {:producer, :done}) which will force the window to finish.

Here’s the code for the producer: https://gist.github.com/emilsoman/bb83608f5d0ad456657cf024f859f2fe .