Returning a random item from a stream, when I know how many items are in the stream

I have a stream of structs that’s very large and I want to avoid loading it into memory as a list if at all possible.

I’d like to be able to extract one random struct from the stream - and I can - but it’s very slow. I’ve tried dropping items but that hasn’t helped. I know how many items are in the stream. My current code is:

  def random_entity(%Metadata{entity_count: max} = metadata) do
    pos = :random.uniform(max)
    stream_entities(metadata)
    |> Stream.with_index()
    |> Stream.filter(fn {e, n} -> n == pos end)
    |> Stream.map(fn {e, n} -> e end)
    |> Enum.to_list()
    |> List.first()
  end

It works but it’s slow and looks awful. Using .drop() to lose items above and below the randomly selected item was even slower. I feel like I’m missing something obvious here - any help appreciated!

This is a lot simpler, although if your stream is truly enormous or something about how it’s implemented is slow, then it still won’t be super fast. How is the stream generated?

2 Likes

The stream comes from splitting a 100MB string with String.splitter(), but it gets bigger when split (each struct generated by the stream is much larger than the fragment of the original string going into it). There might be a number of these being handled at the same time.

Enum.random/1 would usually be my go-to for this as it handles streams of unknown length efficiently, but that still needs to reduce over the whole list.

Since we know the maximum length, I’d use:

def random_entity(%Metadata{entity_count: max} = metadata) do
  offset = :random.uniform(max) - 1

  stream_entities(metadata)
  |> Stream.drop(offset)
  |> Stream.take(1)
  |> Enum.to_list()
  |> hd()
end

This should be somewhat faster, since the computation from stream_entities won’t be done at all for elements past the selected one.

1 Like

That’s about 50% faster on average in a benchmark, which fits with the second half being dropped, a definite improvement thanks!

How did Enum.at(pos) perform?

1 Like

Surprisingly well!


Name                                  ips        average  deviation         median         99th %
stream, drop lower, take 1         0.0784        12.76 s     ±0.11%        12.76 s        12.79 s
Enum.at                            0.0724        13.81 s     ±0.17%        13.83 s        13.84 s
stream with index, n == pos        0.0577        17.33 s     ±0.09%        17.33 s        17.34 s
Enum.random                        0.0564        17.73 s     ±1.72%        17.70 s        18.08 s

Comparison:
stream, drop lower, take 1         0.0784
Enum.at                            0.0724 - 1.08x slower +1.05 s
stream with index, n == pos        0.0577 - 1.36x slower +4.56 s
Enum.random                        0.0564 - 1.39x slower +4.97 s

Memory usage statistics:

Name                                average  deviation         median         99th %
stream, drop lower, take 1         11.17 GB     ±0.00%       11.17 GB       11.17 GB
Enum.at                            11.17 GB     ±0.00%       11.17 GB       11.17 GB
stream with index, n == pos        26.83 GB     ±0.00%       26.83 GB       26.83 GB
Enum.random                        26.96 GB     ±0.21%       26.93 GB       27.04 GB

Comparison:
stream, drop lower, take 1         11.17 GB
Enum.at                            11.17 GB - 1.00x memory usage +0.00442 GB
stream with index, n == pos        26.83 GB - 2.40x memory usage +15.67 GB
Enum.random                        26.96 GB - 2.41x memory usage +15.79 GB
1 Like

Thanks for everyone’s suggestions.

I’m kicking myself now, but the answer turns out to be not using the stream of fully processed data, but instead using a new function to stream the bare minimum and then only processing the one randomly selected record. The time problem was due to stream creation. I feel rather daft.

The other thing I’ve learned is that Elixir’s Enum methods handle streams surprisingly well - Enum.random() was no worse than my first attempt to avoid using Enum.

> Name                                                      ips        average  deviation         median         99th %
> 6. pragmatic mix of (5) and (4)                          0.40         2.48 s     ±1.91%         2.49 s         2.56 s
> 5. Cheat: Shell out to xsltproc, twice                   0.39         2.57 s     ±1.53%         2.56 s         2.65 s
> 7. D'oh!: don't process the entire stream first          0.38         2.65 s     ±1.59%         2.65 s         2.73 s
> 2. stream, drop lower, take 1                           0.117         8.52 s    ±64.36%        10.00 s        15.15 s
> 4. Enum.at                                             0.0943        10.61 s    ±52.13%        12.34 s        16.65 s
> 3. Enum.random                                         0.0539        18.56 s    ±12.10%        17.82 s        22.45 s
> 1. stream with index, n == pos                         0.0527        18.96 s     ±6.17%        18.49 s        21.05 s
> 
> Comparison:
> 6. pragmatic mix of (5) and (4)                          0.40
> 5. Cheat: Shell out to xsltproc, twice                   0.39 - 1.03x slower +0.0867 s
> 7. D'oh!: don't process the entire stream first          0.38 - 1.07x slower +0.173 s
> 2. stream, drop lower, take 1                           0.117 - 3.44x slower +6.05 s
> 4. Enum.at                                             0.0943 - 4.28x slower +8.13 s
> 3. Enum.random                                         0.0539 - 7.49x slower +16.08 s
> 1. stream with index, n == pos                         0.0527 - 7.65x slower +16.48 s
> 
> Memory usage statistics:
> 
> Name                                                    average  deviation         median         99th %
> 6. pragmatic mix of (5) and (4)                      0.00474 GB    ±11.22%     0.00474 GB     0.00548 GB
> 5. Cheat: Shell out to xsltproc, twice               0.00484 GB    ±24.65%     0.00431 GB     0.00691 GB
> 7. D'oh!: don't process the entire stream first      0.00462 GB    ±15.29%     0.00472 GB     0.00535 GB
> 2. stream, drop lower, take 1                           7.84 GB    ±78.31%        3.82 GB       16.57 GB
> 4. Enum.at                                             15.53 GB    ±62.19%       18.05 GB       24.79 GB
> 3. Enum.random                                         26.91 GB     ±0.09%       26.91 GB       26.95 GB
> 1. stream with index, n == pos                         26.88 GB     ±0.14%       26.88 GB       26.92 GB
> 
> Comparison:
> 6. pragmatic mix of (5) and (4)                      0.00474 GB
> 5. Cheat: Shell out to xsltproc, twice               0.00484 GB - 1.02x memory usage +0.00010 GB
> 7. D'oh!: don't process the entire stream first      0.00462 GB - 0.98x memory usage -0.00012 GB
> 2. stream, drop lower, take 1                           7.84 GB - 1654.47x memory usage +7.84 GB
> 4. Enum.at                                             15.53 GB - 3276.31x memory usage +15.53 GB
> 3. Enum.random                                         26.91 GB - 5677.24x memory usage +26.91 GB
> 1. stream with index, n == pos                         26.88 GB - 5671.01x memory usage +26.88 GB
1 Like

That’s expected. The difference between Stream and Enum is that Stream functions return “streams” (lazily evaluated enumerables), but Enum ones return lists (fully evaluated lists) or other calculations based on the input. Both can take any enumerable as as inputs though (if they take enumerables as input). That’s why there’s no Stream alternatives for all the Enum apis, which don’t return an enumumerable (e.g. Enum.reduce, Enum.at, Enum.all?), while there are ones for the functions that do (e.g. Enum.mapStream.map).

1 Like

Yeah to expand on @LostKobrakai 's excellent notes, I wrote this up in a similar thread sometime back that elaborates on this very concretely: Why is Stream.reduce_while missing? - #2 by benwilson512. The key takeaway is this:

They [enum functions] only use more memory if the output of the function needs to hold onto the whole collection.

1 Like