How to optimize memory consumption when use async_stream_nolink to access Redis

I have a file (350 MB) containing around 30 millions of records. For each of these record I need to compare with existing one in Redis and update them in Redis accordingly. Here is my code:

    stream =
      file_path
      |> File.stream!()
      |> Stream.map(fn line -> String.trim_trailing(line, "\n") end)

    Task.Supervisor.async_stream_nolink(
      AdUserProfile.TaskSupervisor,
      stream,
      &process_line/1,
      ordered: false,
      max_concurrency: 4,
      on_timeout: :kill_task
    )
    |> Stream.run()

  defp process_line(user_profile_string) do
    new_proto = user_profile_string |> Base.decode64!() |> UserProfileProto.decode()
    current_proto = RedisCache.get_user_profile_proto(new_proto.user_id)

    if new_proto.article_click_segment != current_proto.article_click_segment do
      merged_proto = %UserProfileProto{
        current_proto
        | article_click_segment: new_proto.article_click_segment
      }

      RedisCache.put_user_profile_proto(merged_proto)
    end
  end

The code works efficiently, but what I am surprised is that the VM memory increased from 84MB to 2.3GB over two hours the program was running. The K8S eventually killed the pod before it finished the job which I think may be caused by memory usage limit set in K8S. I know I can increase memory and 2GB memory is not a big deal, but compared with the size of original file which is 350 MB, 2.3GB memory is a big number.

Here is the screen shot of memory usage (I am not allowed to upload image yet). From the image you can also see that Process memory is the major consumption. So looks like processes are not cleaned by GC after they finished accessing Redis? Is there a way to optimize it?

1 Like

:thinking: Does the memory usage change if you make process_line always return :ok? Currently you’ll wind up accumulating whatever RedisCache.put_user_profile_proto returns for every updated row for the ignored return value of async_stream_nolink.

1 Like

I don’t think it changes. The last statement of the process_line was actually Logger.info, I removed it to make code cleaner here. Logger.info already returns :ok.

1 Like

How many bytes are each user_profile_string? I’m wondering if it could be that you have a bunch of refc binaries that are sitting around like what Saša is explaining here

1 Like

I would think this is not the issue here, at least not in the code we’re seeing, because async_stream spawns a new process for each element of the stream. So each process is short-lived and should be garbage collected after it exits.

Perhaps something funky is going on in RedisCache.{get|put}_user_profile_proto ? Is there a long-lived process hiding behind those calls?

3 Likes

Averagely each user_profile_string is less than 20 bytes.

1 Like

Looks like if only calling RedisCache.get would not cause the memory issue, but if calling RedisCache.put would lead to it.

OK, I found out the reason. For whatever reason ordered: false will end up with using tons of memory. After I removed it from code memory usage went back to normal. Task.async_stream has the same issue. I am not sure if this is a bug expected behavior.

2 Likes

From the docs:

:ordered - whether the results should be returned in the same order as the input stream. When the output is ordered, Elixir may need to buffer results to emit them in the original order. Setting this option to false disables the need to buffer at the cost of removing ordering. This is also useful when you’re using the tasks only for the side effects. Note that regardless of what :ordered is set to, the tasks will process asynchronously. If you need to process elements in order, consider using Enum.map/2 or Enum.each/2 instead. Defaults to true.

So, it seems that removing the order has a cost, as you have indeed experienced. To be honest I would expect the opposite.

2 Likes

This is a bug and it has been fixed on newer version of Elixir. Related link: Flag `ordered: false` in Task.async_stream and Task.Supervisor.async_stream_no_link causes crazy memory usage. · Issue #11760 · elixir-lang/elixir · GitHub

2 Likes