Should Experimental.Flow have a Flow.chunk?

Hi Everyone, I just started to learn (and use) in a pet project of mine Experimental.Flow.
For the moment I am experimenting with:

    chunk_number = 2000

    file # 3.7GB of json data
    |> File.stream!(read_ahead: 100_000)
    |> Flow.from_enumerable()
    |> Flow.partition(Flow.Window.global |> Flow.Window.trigger_every(chunk_number, :reset))
    |> Flow.reduce(fn -> [] end, fn x, y ->
      [[x]|y]
    end)
    |> Flow.emit(:state)
    |> Flow.map(fn chunk ->
      Indexer.index(chunk)
    end)
    |> Flow.run

I was wondering if in the context of Flow a Flow.chunk would be useful. I could wrap partition and reduce in a function for sure, but maybe the dev experience would be nicer if Flow.chunk would exist.
What’s your opinion?

Thanks!

file
|> File.stream!(read_ahead: 100_000)
|> Stream.chunk(2_000)
|> Flow.from_enumerable()

Hi Ben, I already tried but memory was growing quite fast to more than 1gb, I ctrl+ced to not exhaust my RAM.
Most probably I am missing something?

Ah so the issue is, the default demand values for GenStage (and thus flow) is 1000, which means it tries to pull 1000 chunks, each with 2000 items in them before passing it off to a worker. This is obviously problematic. You should do

file_stream = File.stream!(path) |> Stream.chunk(2000, 2000, [])

Flow.new(stages: 4, max_demand: 1)
|> Flow.from_enumerable(file_stream)

I’ve been going on the assumption that you want your Flow.map(fn x -> you want x to be a chunk of 2000 lines right?

2 Likes

Ben, thank you that solves the problem in a more elegant way!
Yes, I would use Flow.map(fn x -> Indexer.index(chunk) end) to have a chunk of 2000 lines
It was not crystal clear to me that the default value for max_demand was 1000, I guess I should RTFM better :slight_smile: