Process events that depends on each other - Flow

Hello, I successfully implemented the whole GenStage & Flow thingy, it is really awesome, so I want to continue with it for the next job:

Imagine you have a variable number of events (let’s say in chunks of 30s). They need to know about the past(-30s) and the future events (+30s) in order to be correctly parsed.
So first batch contains 90s worth of events (“past”, “to be parsed” and “future”).
Second batch contains the ( “to be parsed” as past, “future” is now present and new 30s that are the future)
etc…

The problem i found is I’m not able to tell Flow that he has all the events needed to execute on_trigger/, as far as I know, I need the event count(variable) or the timestamp (there is overlapping) or the executing time (again, variable)

I also tried to “hack it” by setting a count=1 for the window and send it all the needed events at once by sending [events]. But then, in the consumer, I’m not able to do something like real_events = List.first(events) before the pipeline starts.

This can be done using Flow? if so, any tips?. Thanks a lot! I’m really struggling with this one :blush:

1 Like

My GenStage and Flow are extremely rusty so I can’t help you directly. But I have to ask: do you need the back pressure mechanisms?

If not, what you need can be achieved with Stream and Task.async_stream relatively easily.