Stream.uniq and memory consumption

Hi everyone,

I ve been developing with Elixir for about a year now, coming from many years of imperative programming, it is refreshing to do some distributed functional code and see the code coming together quickly. I’ve done some Erlang a while back however, so even if I know about Processes and nice BEAM things, this question is about the functional aspects of elixir, specifically Streams.

In the docs for Stream.uniq/1 it is mentioned

Keep in mind that, in order to know if an element is unique or not, this function needs to store all unique values emitted by the stream. Therefore, if the stream is infinite, the number of elements stored will grow infinitely, never being garbage-collected.

I am currently trying to write a version of uniq/1 that works on a monotone stream, that doesn’t suffer from this problem, since uniq/1 will need to keep only the last element in the accumulator for comparison.
During this process, I tried to replicate this memory issue with Stream.uniq/1 in a (modified) GenStage.Steamer in order to call Process.info on it to get memory usage, while getting elements from the stream one at a time.

The documentation makes perfect sense, but I was so far unable to do replicate the problem as described in the documentation… I am guessing various optimization during compilation are turning my attempt at an infinite stream into a finite one… or forcing garbage collection fixed the issue in my specific test case… or is Stream.uniq automagically doing the right thing if the stream is monotonous ?

So: Is anyone aware of some code somewhere, that demonstrate that memory problem with uniq/1 on an infinite Stream ? if possible something isolated and easy to test with various kinds of Streams ?

Maybe I am going at this the wrong way, and a previous attempt could enlighten my perspective on the topic…

Thanks a lot !

Hey @asmodehn can you show the code you’re running?

If you look at the sourcecode for Stream.uniq you end up over here elixir/lib/elixir/lib/stream/reducers.ex at 0909940b04a3e22c9ea4fedafa2aac349717011c · elixir-lang/elixir · GitHub and you can see that for each value it’s simply doing Map.put(acc, value, true). For each value it adds an item to a map. If there are infinite different values then the map will grow infinitely large.

For your purposes though I think you could use Stream.transform

stream
|> Stream.transform(0, fn item, largest_prev_value ->
  if item.value > largest_prev_value do
    {[item], item.value}
  else
    {[], largest_prev_value}
  end
end)

This is only a sketch because I don’t know the shape of your data, but I’m assuming that by “monotone” you mean that for each item there is a value you can extract and use to judge whether you’ve received something of that value before.

Hi @benwilson512 and thanks for the transform idea.
I reached for that for a naive case, but not for the strict case…
I’m not entirely sure of what returning [] would do, I guess I shall give it a spin…

Indeed by monotone, I mean always increasing or decreasing values, so for each value I can judge if I should drop it or not. I am working with integer clocks (unix timestamps from various servers), so I expect time to only go one way, and just drop values when it is not the case…

The non-strict case seemed quite simple, via Steam.transform/3 as you pointed out: xest/monotone.ex at f2655aec6e4459748b81f6ac19174da5a1c9941f · asmodehn/xest · GitHub

However for the strict case, I want to drop values and keep going, on the stream, while still being able to compare to determine if the next one should be dropped or not. So I dived into uniq/2 that does something similar, and the plan was to modify it in order to compare only with the last value to avoid that memory problem… but I might have dug myself into a hole deeper than expected :slight_smile:

The code is very rough, and I am playing with things I do not fully understand yet. It s not really usable and I m probably going at it in unexpected ways…

But anyway if it helps to understand what I m trying to accomplish here:

Anyway I ll keep digging and report here if I find something interesting… Maybe I can extract something useful in another repo at some point in the future…

Cheers!

This sounds really close to what Stream.dedup/1 is doing, have you considered it?

1 Like

Ah ! Thanks !

Indeed, since I am working with already sorted stream (from the non-strict case), dedup will work nice, I don’t need to use uniq… Seems I overlooked it.

Well, this allows me to side-step the problem of caring about memory entirely at this point, and I saw that Map had different implementations depending on the size, so better if I don’t dive too deep at this point.

Even if it doesn’t satisfy my thirst for knowledge, I still have lots of work to do with Streams, so I guess I ll post again sometime…

Thanks for the help everyone.