How to quickly add to list in map?

Hello,

I have a collection like:

%{stats_a: [4, 5, 6], stats_b: [1, 2, 3]} # not actually integers (see next question)

Each of the 2 lists will be ~1 million items long.

I have a method which needs to push an item to beginning of each list. Ideally, it would in real-time chop the oldest/last items of the list so that it’s never longer than 1,000,000 items exactly

If theres no super fast way to do this however I will set a timer to do it every 1000ms… if there is an O(n) fast way to do it though I’ll do it in realtime. I need this timer anyway to aggregate stats so I’m not bothered if it needs to go there.

I will be pushing to it ~300-500 times/second.

Here’s what I had so far (without the limiting part yet…)

Is this crazy what I wrote?

  def handle_cast({:push_stats, key, stats}, state) do
    items = [stats | state[key]]
    new_state = state |> Map.put(key, items)
    {:noreply, new_state}
  end

Follow up question, the items in the list are actually not numbers (it’s not relevant to above question so I put that demonstration purposes)

The stats actually look like this (eventually I need to add date checking so that I can see items that are < 1 hour, < 24 hours, etc):

%{
  stats_a: [
    {#DateTime<2018-11-25 05:43:16.066834Z>, :group_1, %{timeout: 1, failure: 0, hits: 0}},
    {#DateTime<2018-11-25 05:43:16.066834Z>, :group_1, %{timeout: 0, failure: 0, hits: 0}},
    {#DateTime<2018-11-25 05:43:16.066834Z>, :group_2, %{timeout: 1, failure: 1, hits: 0}}
  ]
}

And i need to convert it to this (the key name is broken out, and then from there I do a bunch of sums)

%{
  group_a: %{timeout: 1, failure: 0, hits: 0},
  group_b: %{timeout: 1, failure: 1, hits: 0}
}

This would be a 1 second trivial thing in imperative language, but with functional code I feel like a putz still… I’m just sitting here like :man_shrugging:t2:

As a sidenote… if this is a stupid data structure by all means please let me know. I wasn’t sure how to really store date times so I can easily pull out records that are newer than something… I figured this seemed like the best?

There is Erlang’s queue data type.

2 Likes

Damn… that is nice. Ok I’ll read how to use that, thank you

defmodule Demo do
  def merge_values(_key, value1, value2),
    do: value1 + value2

  def reducer({key, data}, aggregate) do
    merge_data = fn all_data -> Map.merge(all_data, data, &merge_values/3) end
    Map.update(aggregate, key, data, merge_data)
  end

  def run(list),
    do: List.foldl(list, %{}, &reducer/2)
end

list = [
  {:group_1, %{timeout: 1, failure: 0, hits: 0}},
  {:group_1, %{timeout: 0, failure: 0, hits: 0}},
  {:group_2, %{timeout: 1, failure: 1, hits: 0}}
]

IO.inspect(Demo.run(list))
$ elixir demo.exs
%{
  group_1: %{failure: 0, hits: 0, timeout: 1},
  group_2: %{failure: 1, hits: 0, timeout: 1}
}
3 Likes

Damn that is intense, I need to reread this a few times… thank you. One thing about functional programming is this stuff gets a lot less intuitive/easy. hopefully it gets easeir in time

1 Like

One thing to consider might be using an ets table instead of a server with state. In general, if you need high performance storage, ets tables are a very good solution. You could still have a process periodically reading from the table and doing some statistics.

3 Likes

I found this comment on SO, so I guess that means it’s not good for a list of 1 million which is updating constantly?

Since Erlang code is often used in soft-realtime applications, beware of *amortized* perf characteristics of standardqueue. For example, after 100,000incalls, the firstoutcall will be an O(N) operation. But then remaining 99,999outcalls will be O(1)

Again without context it’s difficult to assess what constraints you can tolerate - it’s also not possible to explore why you think you need to manage 1 million entries in one centralized place.

At this point I’d explore @michalmuskala’s suggestion of using an ETS table (LYSE).

BEAM support for small integers is

On 32-bit architectures: -134217729 < i < 134217728 (28 bits).
On 64-bit architectures: -576460752303423489 < i < 576460752303423488 (60 bits).

An extremely simplistic approach for a single “list” (assuming a single process manages this):

  • {0, {last,first}} record tracks the oldest (first) and most recent (last) id. Update first when you delete records, update last when you add records.
  • {1, data_1} through {n, data_n} carries the data entries
  • for the most part you just burn through ids sequentially until you hit some arbitrary rollover point (say 2000000). Other than around the rollover id, first + 1 refers to the next oldest data while last + 1 is used for the next most recent data to be added.
  • If all this is managed and used strictly by a single process you can simply keep {last,first} in process state and not bother with record zero (in order to save updates).

Ok… this took me a long time but I rewrote what you wrote but in my own terms just so I could understand it… I then realized theres one problem. ONE of the keys I need to do an ‘average’ of, skipping over nil values… so basically I need to count non-nil values.

This seemed easy at first, (inside the inner Map.merge/3 I would just check the key and increment a counter if not nil… but when i reference a variable i create outside these functions I can’t update the variable because its making a local variable in the function :frowning:
I dont know how to solve this easily… can i “update” (rebind) a variable outside a function scope?

The only way i see to do it would be to add something like valid_counts key onto the accumulator %{} but I feel like that makes things so much more confusing when I come back in a month to look at it

valid_response_times_count = 0

IO.inspect List.foldl(items, %{}, fn (row, acc) ->
  # Helpful things to know:
  # - https://elixirforum.com/t/how-to-quickly-add-to-list-in-map/18190
  # - Map.update/4 which runs a function on each value https://hexdocs.pm/elixir/Map.html#update/4
  # - Map.merge/3 which runs a function on each conflicting key https://hexdocs.pm/elixir/Map.html#merge/3
  {date, feed_name, list_stats} = row
  Map.update(acc, feed_name, list_stats, fn acc_stats ->
    Map.merge(acc_stats, list_stats, fn _key, value1, value2 ->
      # I can't rebind variables here...
      (value1 || 0) + (value2 || 0)
    end)
  end)
end)

IO.inspect valid_response_times_count

One possible work around is to store {total,count} instead of the average, so when you update you store {total+more,count+1} and when you need the average later you simply calculate total/count.

1 Like

Dude thats a great idea… ok thank you!

Ok so i ended up thinking this was easy but it turned out to be really complicated because the input value can now be nil, integer, or {total, count}, and now the reducer function can accept any of those as input

I had about 3 guard clauses and I just stopped because it’s too confusing to reason about a month from now

If you are thinking of an easier way would you mind exposing my mind to your greatness?

This is what I ended up doing… it’s so insanely verbose but I just couldnt’ get the {total, count} method to work :frowning: I’m hoping it’s not super simple and I just missed something (very likely).

I first calculate the totals in a list, then i calculate the actual list, and then I combine them

feed_items = :queue.to_list(state[:feed_stats])

feed_counts = List.foldl(feed_items, %{}, fn ({feed_name, %{response_time: response_time}}, acc) ->
  count = if response_time, do: 1, else: 0
  Map.update(acc, feed_name, count, fn acc_total -> acc_total + count end)
end)

feed_stats = List.foldl(feed_items, %{}, fn ({feed_name, list_stats}, acc) ->
  # Helpful things to know:
  # - https://elixirforum.com/t/how-to-quickly-add-to-list-in-map/18190
  # - Map.update/4 which runs a function on each value https://hexdocs.pm/elixir/Map.html#update/4
  # - Map.merge/3 which runs a function on each conflicting key https://hexdocs.pm/elixir/Map.html#merge/3
  Map.update(acc, feed_name, list_stats, fn acc_stats ->
    Map.merge(acc_stats, list_stats, fn _key, value1, value2 ->
      (value1 || 0) + (value2 || 0)
    end)
  end)
end) |> Enum.map(fn {feed_name, combined_stats} ->
  total = combined_stats[:response_time]
  feed_count = feed_counts[feed_name]
  stats_with_avg = case feed_count do
    # if there's not a single sucessful request just show -1
    0 -> combined_stats |> Map.put(:response_time, -1)
    _ -> combined_stats |> Map.put(:response_time, round(total / feed_count))
  end
    {feed_name, stats_with_avg}
end) |> Enum.into(%{})

Wait? A simple average calculator with a reducer and you have 3 clauses? Then theres something wrong…

{sum, cnt} = Enum.reduce(list, {0, 0}, fn
  nil, acc -> acc
  n, {sum, cnt} -> {sum + n, cnt + 1}
end
if cnt > 0, do: sum / cnt, else: {:error, :no_elements_to_average_over}

Does this apply to my input data in post #2 of this thread?

This is what I had before adding averaging which seems to be the simplest way “so far” (thanks to help of peerreynders). I also need to combine it in the final output as well

IO.inspect List.foldl(items, %{}, fn ({date, feed_name, list_stats}, acc) ->
  Map.update(acc, feed_name, list_stats, fn acc_stats ->
    Map.merge(acc_stats, list_stats, fn _key, value1, value2 ->
      # case key do... check if :response_time and then return {total, count} instead of total
      (value1 || 0) + (value2 || 0)
    end)
  end)
end)

I’m sure it does not work with your input directly, but if you want to retain the keys of your old data, this should work out:

data
|> Enum.map(fn {k, l} -> {k, Enum.reduce(from above)} end)
|> Map.new()

Sorry for only having such a half snippet, but I’m currently on my mobile.

This was my weak attempt, however it’s failing if there’s a single item which has a response_value of nil… which is showing me I have absolutely no idea how this works… I thought value1 was the existing value from the accumulator and value2 was from the incoming map (from the list)

EDIT OK crap I see it, its because now it’s setting it for every value2…

IO.inspect List.foldl(items, %{}, fn ({feed_name, list_stats}, acc) ->
  Map.update(acc, feed_name, list_stats, fn acc_stats ->
    # handle the first item
    list_stats = %{list_stats | response_time: {list_stats.response_time || 0, 0}}
    Map.merge(acc_stats, list_stats, fn key, value1, value2 ->
      case key do
        :response_time ->
          {value1 + (value2 || 0), value1 + 1}
        _ ->
          (value1 || 0) + (value2 || 0)
      end
    end)
  end)
end)

Obviously I don’t care about line count - I like code that is easy to change:

defmodule Demo do
  defp initial_total(:response_time, time),
    do: if(is_number(time), do: {time, 1}, else: {0, 0})

  defp initial_total(_key, value),
    do: value || 0

  defp initial_aggregate({key, value}, aggregate),
    do: Map.put(aggregate, key, initial_total(key, value))

  defp merge_total(:response_time, {total_time, count} = total, time),
    do: if(is_number(time), do: {total_time + time, count + 1}, else: total)

  defp merge_total(_key, total, value),
    do: total + (value || 0)

  defp merge_aggregate({key, value}, aggregate) do
    new_total =
      case Map.fetch(aggregate, key) do
        {:ok, total} ->
          merge_total(key, total, value)

        _ ->
          initial_total(key, value)
      end

    Map.put(aggregate, key, new_total)
  end

  defp item_aggregator({key, data}, stats) do
    new_aggregate =
      case Map.fetch(stats, key) do
        {:ok, aggregate} ->
          Enum.reduce(data, aggregate, &merge_aggregate/2)

        _ ->
          Enum.reduce(data, %{}, &initial_aggregate/2)
      end

    Map.put(stats, key, new_aggregate)
  end

  defp finalize_total({:response_time, {total_time, count}}, aggregate) do
    if count > 0 do
      Map.put(aggregate, :response_time, div(total_time, count))
    else
      aggregate
    end
  end

  defp finalize_total({key, total}, aggregate) do
    Map.put(aggregate, key, total)
  end

  defp finalize_aggregate({key, aggregate}, stats),
    do: Map.put(stats, key, Enum.reduce(aggregate, %{}, &finalize_total/2))

  def make_stats(items) do
    items
    |> List.foldl(%{}, &item_aggregator/2)
    |> Enum.reduce(%{}, &finalize_aggregate/2)
  end
end

#
# item: {:group_1, data}
# data: %{timeout: 0, failure: 0, hits: 1, response_time: 100}
# value: associated with a "key" inside the "data" Map
# aggregate: %{failure: 0, hits: 2, response_time: {1100,2}, timeout: 0}
# total: value associated with a "key" inside the "aggregate" Map
# finalized_aggregate: %{failure: 0, hits: 2, response_time: 550, timeout: 0}
# stats: %{feed_key => (finalized_)aggregate}
#

feed_items = [
  {:group_1, %{timeout: 0, failure: 0, hits: 1, response_time: 100}},
  {:group_1, %{timeout: 0, failure: 0, hits: 1, response_time: 1000}},
  {:group_2, %{timeout: 0, failure: 0, hits: 1, response_time: 50}},
  {:group_2, %{timeout: 0, failure: 0, hits: 1, response_time: 2000}},
  {:group_3, %{timeout: 0, failure: 1, hits: 0}},
  {:group_3, %{timeout: 1, failure: 1, hits: 0, response_time: nil}}
]

IO.inspect(Demo.make_stats(feed_items))
$ elixir demo.exs
%{
  group_1: %{failure: 0, hits: 2, response_time: 550, timeout: 0},
  group_2: %{failure: 0, hits: 2, response_time: 1025, timeout: 0},
  group_3: %{failure: 2, hits: 0, timeout: 1}
}
2 Likes

That attempt counts nils as 0 instead of skipping them, at least it like this from a first glance, is this really what you want?

1 Like