Stream.iterate(initial_page(url, auth, pit_id), &next_page(url, auth, &1))
|> Stream.take_while(&(&1 != :eof))
|> Stream.map(&to_csv_rows/1)
|> CSV.dump_to_stream()
|> Stream.each(&chunk(conn, &1))
|> Stream.run()
Hi, I’m building a csv by using Stream.iterate() to paginate over elastic search responses with their “search_after” functionality and chunking each page as csv rows. All of this works really well, but I can’t figure out how to handle a user initiated cancellation. I’ve tried using reduce_while
as shown in the chunk/2 documentation but the error tuple never evaluates. I assume there’s some process that’s just ending but where would it’s shutdown message go?
The reason this is so important is elasticsearch highly recommends killing a PIT (point in time) when you’re finished with it as they are extremely expensive, I’d like to just run a callback whenever a user cancels the csv download.