I ve been developing with Elixir for about a year now, coming from many years of imperative programming, it is refreshing to do some distributed functional code and see the code coming together quickly. I’ve done some Erlang a while back however, so even if I know about Processes and nice BEAM things, this question is about the functional aspects of elixir, specifically Streams.
In the docs for
Stream.uniq/1 it is mentioned
Keep in mind that, in order to know if an element is unique or not, this function needs to store all unique values emitted by the stream. Therefore, if the stream is infinite, the number of elements stored will grow infinitely, never being garbage-collected.
I am currently trying to write a version of
uniq/1 that works on a monotone stream, that doesn’t suffer from this problem, since
uniq/1 will need to keep only the last element in the accumulator for comparison.
During this process, I tried to replicate this memory issue with
Stream.uniq/1 in a (modified)
GenStage.Steamer in order to call
Process.info on it to get memory usage, while getting elements from the stream one at a time.
The documentation makes perfect sense, but I was so far unable to do replicate the problem as described in the documentation… I am guessing various optimization during compilation are turning my attempt at an infinite stream into a finite one… or forcing garbage collection fixed the issue in my specific test case… or is Stream.uniq automagically doing the right thing if the stream is monotonous ?
So: Is anyone aware of some code somewhere, that demonstrate that memory problem with
uniq/1 on an infinite
Stream ? if possible something isolated and easy to test with various kinds of Streams ?
Maybe I am going at this the wrong way, and a previous attempt could enlighten my perspective on the topic…
Thanks a lot !
Hey @asmodehn can you show the code you’re running?
If you look at the sourcecode for
Stream.uniq you end up over here elixir/reducers.ex at 0909940b04a3e22c9ea4fedafa2aac349717011c · elixir-lang/elixir · GitHub and you can see that for each value it’s simply doing
Map.put(acc, value, true). For each value it adds an item to a map. If there are infinite different values then the map will grow infinitely large.
For your purposes though I think you could use
|> Stream.transform(0, fn item, largest_prev_value ->
if item.value > largest_prev_value do
This is only a sketch because I don’t know the shape of your data, but I’m assuming that by “monotone” you mean that for each item there is a value you can extract and use to judge whether you’ve received something of that value before.
Hi @benwilson512 and thanks for the transform idea.
I reached for that for a naive case, but not for the strict case…
I’m not entirely sure of what returning  would do, I guess I shall give it a spin…
Indeed by monotone, I mean always increasing or decreasing values, so for each value I can judge if I should drop it or not. I am working with integer clocks (unix timestamps from various servers), so I expect time to only go one way, and just drop values when it is not the case…
The non-strict case seemed quite simple, via
Steam.transform/3 as you pointed out: xest/monotone.ex at f2655aec6e4459748b81f6ac19174da5a1c9941f · asmodehn/xest · GitHub
However for the strict case, I want to drop values and keep going, on the stream, while still being able to compare to determine if the next one should be dropped or not. So I dived into
uniq/2 that does something similar, and the plan was to modify it in order to compare only with the last value to avoid that memory problem… but I might have dug myself into a hole deeper than expected
The code is very rough, and I am playing with things I do not fully understand yet. It s not really usable and I m probably going at it in unexpected ways…
But anyway if it helps to understand what I m trying to accomplish here:
Anyway I ll keep digging and report here if I find something interesting… Maybe I can extract something useful in another repo at some point in the future…
This sounds really close to what
Stream.dedup/1 is doing, have you considered it?
Ah ! Thanks !
Indeed, since I am working with already sorted stream (from the non-strict case),
dedup will work nice, I don’t need to use
uniq… Seems I overlooked it.
Well, this allows me to side-step the problem of caring about memory entirely at this point, and I saw that
Map had different implementations depending on the size, so better if I don’t dive too deep at this point.
Even if it doesn’t satisfy my thirst for knowledge, I still have lots of work to do with Streams, so I guess I ll post again sometime…
Thanks for the help everyone.