CxLeaderboard - a comprehensive leaderboard library

I wrote a library for building leaderboards. Would love any feedback. This is the list of features from the readme:

  • Ranks, percentiles, any custom stats of your choice
  • Concurrent reads, sequential writes
  • Stream API access to records from the top and the bottom
  • O(1) querying of any record by id
  • Auto-populating data on leaderboard startup
  • Adding, updating, removing, upserting of individual entries in live leaderboard
  • Fetching a range of records around a given id (contextual leaderboard)
  • Pluggable data stores: EtsStore for big boards, TermStore for dynamic mini boards
  • Atomic full repopulation in ~O(2n log n) time
  • Multi-node support
  • Extensibility for storage engines (CxLeaderboard.Storage behaviour)

github | docs | hex.pm

8 Likes

:wave:

For the term storage, :gb_trees might be a better fit than lists in maps.

I think DemonWare used them together with ets to keep rankings in call of duty. But that was before maps were introduced.

4 Likes

Wow that presentation is full of interesting insights, thank you! And gb_trees looks like a useful structure, will read more about it (esp. once erlang documentation site starts loading for me).

I wanted to link it but the website was down for me as well … You can do erl -man gb_trees in your shell.

3 Likes

I’ll probably give it a try soon

1 Like

Let me know how it goes or if you need any help/have questions. Happy to help.

1 Like

Can I get a example for Postgres auto-populate on startup.

I got the following but can’t get it to work. I think my issue is the connection is closed when the transaction ended.
related issue. https://github.com/elixir-ecto/postgrex/issues/373

worker(CxLeaderboard.Leaderboard, [:global, [data: loadDataToLeaderboard(pid)]])

def loadDataToLeaderboard(pid) do
{:ok, stream} = Postgrex.transaction(pid, fn(conn) ->
Postgrex.stream(conn, “SELECT id, elo FROM users”, [], max_rows: :infinity, timeout: :infinity)
end)
stream
end

@jonathanleang Interesting. I’ve never tried using these Ecto streams, I don’t think they play too well with a case where the stream should live for a long time. In our case I wrote my own module called StreamExt and implemented a couple of streaming functions in it. One based on offset/limit, and another (more efficient one) based on id comparisons.

defmodule StreamExt do
  @doc """
  Stream data in batches from an Ecto repo using a query.
  """
  def ecto_in_batches(repo, query, batch_size \\ 1000) do
    import Ecto.Query, only: [from: 1, from: 2]

    Stream.unfold(0, fn
      :done ->
        nil

      offset ->
        results =
          repo.all(from(_ in query, offset: ^offset, limit: ^batch_size))

        if length(results) < batch_size,
          do: {results, :done},
          else: {results, offset + batch_size}
    end)
  end

  def ecto_in_batches_by_id(repo, query, batch_size \\ 1000) do
    import Ecto.Query, only: [from: 2]

    Stream.unfold(-1, fn
      :done ->
        nil

      last_id ->
        results =
          repo.all(
            from(
              r in query,
              where: r.id > ^last_id,
              limit: ^batch_size
            )
          )

        {new_last_id, count} =
          results
          |> Enum.reduce({-1, 0}, fn r, {_, i} -> {elem(r, 0), i + 1} end)

        if count < batch_size,
          do: {results, :done},
          else: {results, new_last_id}
    end)
  end
end

Hey, just following up, did you manage to make it work?

@maxim
Thanks for following up, I have been trying for some time now, but kind of stuck.

here is what I got now.

   worker(CxLeaderboard.Leaderboard,[
    :global, 
    [data: loadDataToLeaderboard()]
  ])

 def loadDataToLeaderboard() do
     StreamExt.ecto_in_batches_by_id(Darkmoor.Repo, Darkmoor.User, 5)
     |> Stream.map(fn(batch) ->
       Enum.map(batch, fn({id, elo, name}) -> 
           {{elo, id}, name}
           end)
       end)
       # [{{100, 1}, "name1"}, {{200, 2}, "name2"}] this works
   end



  def ecto_in_batches_by_id(repo, query, batch_size \\ 1000) do
    import Ecto.Query, only: [from: 2]

    Stream.unfold(-1, fn
      :done ->
        nil

      last_id ->
        results =
          repo.all(
            from(
              r in query,
              select: {r.id, r.elo, r.name},
              where: r.id > ^last_id,
              limit: ^batch_size
            )
          )

        {new_last_id, count} =
          results
          |> Enum.reduce({-1, 0}, fn r, {_, i} -> {elem(r, 0), i + 1} end)
        if count < batch_size,
          do: {results, :done},
          else: {results, new_last_id}
    end)
  end

Ah I see, let me give you a specific solution here. I kind of forgot that my batch function wasn’t for direct usage. Try the following:

  1. Add this file to your elixir project: https://gist.github.com/maxim/59628bbe5c034ec5d795a2e2ba01312b

  2. Use this function:

    def leaderboard_stream() do
      import Ecto.Query, only: [from: 2]
      query = from(r in Darkmoor.User, select: {{r.elo, r.id}, r.name})
      StreamExt.ecto(Darkmoor.Repo, query, batch_size: 5, strategy: :id)
    end
    
  3. And finally declare your worker like this:

    worker(CxLeaderboard.Leaderboard, [:global, [data: leaderboard_stream()]])
    

Let me know if this works.

Oh sorry wait a sec, I missed your customization inside ecto function, sec, let me fix it.

Ok check my post above, I edited it in a way that should work. Let me know if something is wrong again, happy to help get it working.

I customized Enum.reduce in ecto_in_batches_by_id and got it working, thanks for your help.

|> Enum.reduce({-1, 0}, fn r, {_, i} -> {elem(elem(r, 0), 1), i + 1} end)

Oh you’re right! That function always expects id as the first item, and if I rearrange select fields it won’t work. Great find! However, I don’t recommend customizing the function because it’s meant to be reusable in any place where you need to stream data from database. Instead I suggest to go back to what you were doing originally, but it’s much simpler now:

def leaderboard_stream() do
  import Ecto.Query, only: [from: 2]
  query = from(r in Darkmoor.User, select: {r.id, r.elo, r.name})

  Darkmoor.Repo
  |> StreamExt.ecto(query, batch_size: 5, strategy: :id)
  |> Stream.map(fn {id, elo, name} -> {{elo, id}, name} end)
end

Got it working now, thank you.

1 Like

I have an other question.
Currently the smaller or more negative the score is the higher the rank is, how do I rank the player higher, the higher the score is ?

Notice how in the README example the scores are all negative. I didn’t want to implement any special sorting logic, and instead left this simply up to the user, so all you need to do is put a minus in front of your scores, then map them back (minus again) when getting them for display.

Example:

def leaderboard_stream() do
  import Ecto.Query, only: [from: 2]
  query = from(r in Darkmoor.User, select: {r.id, r.elo, r.name})

  Darkmoor.Repo
  |> StreamExt.ecto(query, batch_size: 5, strategy: :id)
  |> Stream.map(fn {id, elo, name} -> {{-elo, id}, name} end) # <=== minus added on this line
end

Thanks seems to be working with my 3 server cluster with Phoenix. :+1:

1 Like

Hi there, we just launched our game and got featured on the app store, the CPU load is really high and our leaderboard controller is taking up most of the time.
is there any quick fix, our server is hosted on DigitalOcean 8 cores now, and soon there will be no bigger instance to upgrade to, we have been doubling the traffic every 1-2 days, the leaderboard seems to run much faster on MBP maybe memory bandwidth is much lower on vm.

leaderboard = Leaderboard.client_for(country)
globalResult = ConCache.get_or_store(:room, "leaderboard"<>Atom.to_string(country), fn() ->
  leaderboard
  |> Leaderboard.top()
  |> Enum.take(topranklimitInt)
end)


nearResult = ConCache.get_or_store(:room, "leaderboard"<>Integer.to_string(user_id)<>Atom.to_string(country), fn() ->
    Leaderboard.get(leaderboard, user_id, first..last)
end)