My elixir implementation of the 1 billion row challenge

Hi all. If you’ve looked on Twitter or YouTube you’ve seen the 1 billion row challenge, usually done in Java. I’ve written an Elixir version, feel free to have a look and see if you can improve on it.

1 Like

from a quick look, think you could benefit from using nimble_parsec to parse the file/stream - GitHub - dashbitco/nimble_parsec: A simple and fast library for text-based parser combinators - then I saw Agent usage - which I believe is generally not advised - probably more speed in using ets etc. - but this could be one of the exceptions…

all of this from a very quick look, and not fully understanding the problem, so I could be totally wrong.

Hi outlog - the original rules for the Java challenge, and the spirit to which I’m trying to adhere, is to just use what is in the standard language and not pull in any external libraries.

Are you saying that ets in one process would beat using Agents to farm out the work to multiple CPUs? Or that a GenServer-based approach would be Agents?

There’s probably some small gains to be had by changing the worker_pool to a map rather than list so that you can avoid Enum.at here

Agent.cast(Enum.at(worker_pool, rem(index, @pool_size)), Brc, :process_lines, [job])

It has to iterate over the worker_pool list each time to find the agent id. Since the worker pool is small each individual call is going to be very fast, but you are calling it 100_000 times. On my machine the average of 100 runs of 100_000 calls to Enum.at on a list of 8 items versus calling Map.get on a map of 8 items keyed by index was a difference of 17631ms. Whether that is significant is for you to decide. Probably the Agent issue is more impactful.

My rough little benchmark:

list = 1..8 |> Enum.to_list
map = list |> Enum.with_index() |> Map.new(fn {v, k} -> {k, v} end)
fun = fn n -> 1..n |> Enum.map(fn i -> Enum.at(list, rem(i, 8)) end) end
fun2 = fn n -> 1..n |> Enum.each(fn i -> Map.get(map, rem(i,8)) end) end
1..100 |> Enum.map(fn _ -> :timer.tc(fn -> fun.(100_000) end) end) |> Enum.map(&elem(&1, 0)) |> Enum.sum() |> div(100)
# 157815
1..100 |> Enum.map(fn _ -> :timer.tc(fn -> fun2.(100_000) end) end) |> Enum.map(&elem(&1, 0)) |> Enum.sum() |> div(100)
# 140184
1 Like

I’m relatively new to Elixir & BEAM but I don’t understand how using ets over Agents will facilitate using multiple CPUs to farm out processing of the lines.

That post probably offers better insight than I can.

There’s also this blog post which offers the following explanation:

An ETS based approach
Other times, if the Agent doesn’t cut it for you, you might something faster. In these cases ETS might be a good option. The good thing about ETS is that it will always be faster because it doesn’t go through the Erlang Scheduler, furthermore it also supports concurrent reads and writes, which the Agent does not. However, it’s a bit more limited when you want to do atomic operations. Overall it’s very well suited for a simple shared key/value store, but if it’s better suited or not for your specific problem, that’s up to you.

Finally, the approach that immediately came to my mind was not using a cache at all but using Task.asyn_stream as described in this blog post

2 Likes

I shot past Map and went to ets, which I’ve read has O(1) lookup. It didn’t make a noticeable difference.

I think you may have changed some other parts of the code as well, looking at the github repo. So you might have minimized the impact the change could have. I just re-ran the benchmark on my machine, using an ETS implementation and got similar results as I posted above:

:ets.new(:sample, [:named_table, :public])
map |> Enum.each(fn {k, v} -> :ets.insert(:sample, {k, v}) end)
fun3 = fn n -> 1..n |> Enum.each(fn i -> :ets.lookup(:sample, rem(i,8)) end) end
1..100 |> Enum.map(fn _ -> :timer.tc(fn -> fun3.(100_000) end) end) |> Enum.map(&elem(&1, 0)) |> Enum.sum() |> div(100)
# 131589
1..100 |> Enum.map(fn _ -> :timer.tc(fn -> fun2.(100_000) end) end) |> Enum.map(&elem(&1, 0)) |> Enum.sum() |> div(100)
# 139659
1..100 |> Enum.map(fn _ -> :timer.tc(fn -> fun.(100_000) end) end) |> Enum.map(&elem(&1, 0)) |> Enum.sum() |> div(100)
# 152674

Also be aware that while ETS lookups are fast (O(1) as you say) and process communication is very efficient in the BEAM, making process calls can be slower than accessing a data structure in the same process. ETS will generally be a better choice for very large collections and for structures needing to be accessed from multiple concurrent processes. In this case your worker_pool is never going to be that large. Whether sharing a single ETS across multiple processes is more efficient than creating the worker_pool map for each process I can’t say but probably it is given how many processes you could end up spawning to access it. This thread has a good discussion of the tradeoffs between maps and ETS tables.

Rather than focusing on this tiny worker_pool data structure, I think you should look at processing all the lines directly to an ETS table rather than creating a bunch of maps stored in Agents that you have to merge later. Create Tasks that process lines to the ETS table then at the end pull the table data for your final output. If you set up your ETS table as an ordered_set you can probably avoid the step where you’re having to sort N items where N is the number of cities in the data set. You might event look at DETS to avoid running out of memory given the large data set.

1 Like

Another implementation outside of the standard library if you’re interested.

2 Likes

Thanks for the shoutout, there are several implementations in this discussion, including one simply using Flow+ets as well.

Create Tasks that process lines to the ETS table then at the end pull the table data for your final output.

It automatically does what you suggested above

1 Like

I think @blubparadox is trying to restrict their code to only standard lib modules, which eliminates Flow. I like that Flow implementation though.

I threw together an implementation as proof of concept of the things I’ve mentioned above, and, well, maybe don’t listen to me because it’s slow. 130 seconds for 50M lines slow. I think using an ordered_set for the ETS table is the reason because it makes lookups O(logN) instead of O(1). Since there are a lot of lookups it probably makes sense to just use a regular set type for the table and then sort the entries at the end. I will see the memory usage was pretty low, for what that’s worth. Anybody know why adding the encoding: :utf8 option to File.stream!/3 makes it twice as slow but more specifically seems to prevent full utilization of CPU? Without that option the CPU is pegged in all cores from the jump, but with that option the cores bounced around a lot more but averaged around 40-50% use.

I built an Erlang implementation that does 1B lines in 60 seconds on my slow laptop.

To improve it further, maybe one could use the counters module. So much to do, so little time…

8 Likes

This proved to be somewhat correct as making that change gets it down to around 95 seconds for 50M lines. Going to try processing lines via pattern matching by hand. Is it fair to optimize based on an assumption that the cities provided in the source for generating the data set are the only ones we need to worry about?


Went ahead and implemented based on the assumption that we only need to worry about the city names provided. This means you can pattern match lines as binaries based on the expected length of the city name. The longest was 26 characters and the shortest 3. Each character is utf8 so match on 26 * 8 down to 3 * 8 in increments of 8. This gets me consistently under 50sec for the 50M rows. Scared to try the billion on my 14yo i5 4 core machine. Think I have room to optimize more by moving away from Float.parse.

Float.parse does add up when doing it a billion times.
I take advantage of it always being one digit after the decimal so I split it around the decimal and basically use integers multiplied by 10 until the end.
Others are doing more complicated but faster pattern matching.

I wasn’t sure if it was fair to assume the input would be restricted to one decimal place for the temp. I guess if I’m assuming the cities are predetermined I can probably assume that format too.

I think there’s some confusion. I’m not using Agents as a dumb storage technique, only using get and update. I’m using Agents as a low-setup GenServer, using Agent.cast and passing it a module and function atom to use Agent as active workers.

1 Like