How does GenStage.from_enumerable decide where to chunk a binary stream?

Hello,

I am testing a custom GenStage consumer. During testing, I am feeding it with a Debian package file:

File.stream!("test/packages/deb/dep1_1.0_amd64.deb") |> GenStage.from_enumerable

Below is what I see as the events being dispatched (with min_demand: 2, max_demand: 5):

[
  "!<arch>\n",
  "debian-binary/  1434455625  1000  1000  100664  4         `\n",
  "2.0\n",
  "control.tar.gz/ 1434455625  1000  1000  100664  328       `\n",
  <<31, 139, 8, 0, 73, 14, 128, 85, 0, 3, 237, 209, 65, 75, 195, 48, 20, 7, 240,
    158, 243, 41, 114, 212, 131, 109, 82, 186, 21, 138, 14, 5, 15, 10>>,
  "\n",
  <<195, 129, 247, 172, 125, 186, 176, 46, 169, 105, 66, 213, 79, 111, 86, 65,
    134, 224, 69, 168, 10>>,
  <<254, 127, 80, 82, 154, 151, 151, 63, 125, 105, 150, 76, 78, 68, 101, 57, 27,
    215, 232, 243, 58, 190, 203, 124, 86, 236, 203, 164, 148, 241, 251, 92, 148,
    121, 194, 103, 211, 71, 75, 146, 208, 123, ...>>
]
[
  <<196, 46, 92, 189, 209, 158, 106, 31, 92, 44, 85, 187, 102, 94, 176, 91, 165,
    141, 143, 15, 197, 226, 83, 213, 56, 173, 204, 121, 88, 7, 227, 195, 130,
    93, 155, 248, 195, 219, 150, 154, 147, 149, 126, 141, 39, 4, 187, 164, 46,
    ...>>,
  <<31, 139, 8, 0, 73, 14, 128, 85, 0, 3, 211, 211, 103, 160, 57, 48, 0, 2, 115,
    3, 3, 48, 109, 128, 73, 131, 217, 134, 70, 166, 38, 32, 101, 134, 134, 134,
    64, 113, 51, 3, 83, 51, 6, 5, 83, 218, 59, 141, ...>>
]
[
  <<70, 193, 40, 24, 5, 163, 96, 20, 140, 130, 81, 48, 10>>,
  <<70, 193, 40, 24, 5, 163, 96, 20, 140, 130, 81, 48, 10>>,
  <<70, 193, 40, 24, 5, 163, 128, 28, 0, 0, 61, 181, 127, 136, 0, 40, 0, 0, 10>>
]

My file stream is read as raw:

%File.Stream{
  line_or_bytes: :line,
  modes: [:raw, :read_ahead, :binary],
  path: "test/packages/deb/dep1_1.0_amd64.deb",
  raw: true
}

How does GenStage.from_enumerable decide where to chunk? I currently suspect when it detects a \n character in the stream. Can I customise this to chunk on evenly size of e.g. 4kB ‘events’?

Ringo

This is to specify linewise chunking or a number of bytes per chunks, you set it to line.

https://hexdocs.pm/elixir/File.html#stream!/3

1 Like