I have a file (350 MB) containing around 30 millions of records. For each of these record I need to compare with existing one in Redis and update them in Redis accordingly. Here is my code:
stream =
file_path
|> File.stream!()
|> Stream.map(fn line -> String.trim_trailing(line, "\n") end)
Task.Supervisor.async_stream_nolink(
AdUserProfile.TaskSupervisor,
stream,
&process_line/1,
ordered: false,
max_concurrency: 4,
on_timeout: :kill_task
)
|> Stream.run()
defp process_line(user_profile_string) do
new_proto = user_profile_string |> Base.decode64!() |> UserProfileProto.decode()
current_proto = RedisCache.get_user_profile_proto(new_proto.user_id)
if new_proto.article_click_segment != current_proto.article_click_segment do
merged_proto = %UserProfileProto{
current_proto
| article_click_segment: new_proto.article_click_segment
}
RedisCache.put_user_profile_proto(merged_proto)
end
end
The code works efficiently, but what I am surprised is that the VM memory increased from 84MB to 2.3GB over two hours the program was running. The K8S eventually killed the pod before it finished the job which I think may be caused by memory usage limit set in K8S. I know I can increase memory and 2GB memory is not a big deal, but compared with the size of original file which is 350 MB, 2.3GB memory is a big number.
Here is the screen shot of memory usage (I am not allowed to upload image yet). From the image you can also see that Process memory is the major consumption. So looks like processes are not cleaned by GC after they finished accessing Redis? Is there a way to optimize it?
1 Like
Does the memory usage change if you make process_line
always return :ok
? Currently you’ll wind up accumulating whatever RedisCache.put_user_profile_proto
returns for every updated row for the ignored return value of async_stream_nolink
.
1 Like
I don’t think it changes. The last statement of the process_line
was actually Logger.info
, I removed it to make code cleaner here. Logger.info
already returns :ok
.
1 Like
How many bytes are each user_profile_string
? I’m wondering if it could be that you have a bunch of refc binaries that are sitting around like what Saša is explaining here
1 Like
I would think this is not the issue here, at least not in the code we’re seeing, because async_stream
spawns a new process for each element of the stream. So each process is short-lived and should be garbage collected after it exits.
Perhaps something funky is going on in RedisCache.{get|put}_user_profile_proto
? Is there a long-lived process hiding behind those calls?
3 Likes
Averagely each user_profile_string
is less than 20 bytes.
1 Like
Looks like if only calling RedisCache.get
would not cause the memory issue, but if calling RedisCache.put
would lead to it.
OK, I found out the reason. For whatever reason ordered: false
will end up with using tons of memory. After I removed it from code memory usage went back to normal. Task.async_stream
has the same issue. I am not sure if this is a bug expected behavior.
2 Likes
From the docs:
:ordered
- whether the results should be returned in the same order as the input stream. When the output is ordered, Elixir may need to buffer results to emit them in the original order. Setting this option to false disables the need to buffer at the cost of removing ordering. This is also useful when you’re using the tasks only for the side effects. Note that regardless of what :ordered
is set to, the tasks will process asynchronously. If you need to process elements in order, consider using Enum.map/2
or Enum.each/2
instead. Defaults to true
.
So, it seems that removing the order has a cost, as you have indeed experienced. To be honest I would expect the opposite.
2 Likes