Data structure for a time series

How would one store a time series (updates in “real time”) with elixir/erlang? I’m getting some market data over a socket and would like to know how would you guys decide to store it efficiently. Ideally, I would also like to run some rolling calculations (not sure if it’s how it called, I want to apply some aggregation over some pre-scpecified time period (like for the last minute)) on the incoming data.

It’s not a real project, it’s just for a hobby. So I would like to just use erlang/elixir tools, and not reach out for leveldb or anything like that.

Found this discussion on using mnesia for storing time series data http://erlang.org/pipermail/erlang-questions/2005-July/016079.html, but even mnesia seems like overkill to me.

I think I just need to keep a list of a fixed length in process’s memory. So that when the new data comes in, that last element gets removed from the list and the new data gets put in the head of the list.

old_list = [b, c, d]
# new data `a` comes in
new_list = update(old_list, a)
new_list == [a, b, c]

Don’t know how to do that without pointers … Will look into how :queue module works.

defmodule Quote.History do
  @type t :: {new :: list, old :: list}

  def new do
    {[], []}
  end

  @doc """

      iex> new([3, 2, 1])
      {[3], [1, 2]}

  """
  @spec new(list) :: t
  def new(list)

  def new([h | t]) do
    {[h], :lists.reverse(t)}
  end

  @doc """

    iex> history = new([3, 2, 1])
    iex> history |> push(4) |> to_list()
    [4, 3, 2]

  """
  @spec to_list(t) :: list
  def to_list({new, old}) do
    new ++ :list.reverse(old)
  end

  @doc """

      iex> history = {[3], [1, 2]} = new([3, 2, 1])
      iex> push(history, 4)
      {[4, 3], [2]}

      iex> push({[4, 3], [2]}, 5)
      {[5, 4, 3], []}

      iex> push({[5, 4, 3], []}, 6)
      {[6, 5], [4]}

  """
  @spec push(t, any) :: t
  def push(hisroty, element)

  def push({new, [_ | rest]}, element) do
    {[element | new], rest}
  end

  def push({[new | rest], []}, element) do
    [_ | old] = :lists.reverse(rest)
    {[element, new], old}
  end

  def push({[], []}, element) do
    {[], [element]}
  end
end

Something like this maybe. Don’t like all these calls to :lists.reverse

Seems to work. But also seems hacky …

Interactive Elixir (1.6.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> history = Quote.History.new [3, 2, 1]
{[3], [1, 2]}
iex(2)> Quote.History.to_list history
[3, 2, 1]
iex(3)> Quote.History.push history, 4
{[4, 3], [2]}
iex(4)> history = Quote.History.push history, 4
{[4, 3], [2]}
iex(5)> Quote.History.to_list history
[4, 3, 2]
iex(6)> history = Quote.History.push history, 5
{[5, 4, 3], []}
iex(7)> Quote.History.to_list history
[5, 4, 3]
iex(8)> history = Quote.History.push history, 6
{[6, 5], [4]}
iex(9)> Quote.History.to_list history
[6, 5, 4]

When I researching into this topic a while back I thought to use ETS ordered sets. Basically use the ordered set with a counter like {n, payload} and monotonically increase n. Then you can use :ets.next/1 for access. But of course then you need a sweeper to delete the head of the set on some kind of regular basis. Or manage n predefined slots in an ordered set and manage the read and write “pointers” manually.

I also tried the list-based approach you describe above but it does involve a lot of list copying on pretty much each update. I never finished up my experiments so I have no useful data to offer.

1 Like

Wrote a simple benchmark

defmodule Quote.HistoryBench do
  use Benchfella

  @list Enum.to_list(1..1000)

  bench "new" do
    Quote.History.new(@list)
  end

  bench "single push", history: Quote.History.new(@list) do
    Quote.History.push(history, 1001)
  end

  bench "multi push", history: Quote.History.new(@list) do
    push(history, 20, 1001)
  end

  bench "history to list", history: Quote.History.new(@list) do
    Quote.History.to_list(history)
  end

  defp push(history, 0, _element), do: history
  defp push(history, counter, element) do
    history
    |> Quote.History.push(element)
    |> push(counter - 1, element + 1)
  end
end
Settings:
  duration:      1.0 s

## Quote.HistoryBench
[...] 1/4: history to list
[...] 2/4: multi push
[...] 3/4: new
[...] 4/4: single push

Finished in 16.97 seconds

## Quote.HistoryBench
benchmark name   iterations   average time
single push       100000000   0.04 µs/op
multi push         10000000   0.67 µs/op
history to list      500000   4.40 µs/op
new                  500000   4.51 µs/op

history to list is a bit too slow … Maybe I can run rolling calculations on the {new, old} tuple and forgo converting this tuple to list.

Will try :ets approach next. Thanks for the idea, @kip! I’m afraid though, that it would also introduce extra copying (from ets into the process) …

Can you amortize the cost of popping the old items from the list by allowing the list to grow up to 2x the desired size, then calling Enum.take to keep the required prefix?

1 Like

But that would require me to check the length of the list at least sometimes, which is also a pricey operation (unless done via pattern matching? But that would require a macro). Or have I misunderstood what you meant?

I would test with https://hex.pm/packages/epocxy which already has a ring buffer implemented in ETS.

1 Like

With :ets with using the following unfinished (doesn’t delete stale values) code

defmodule Quote.HistoryETS do
  @table :history_ets

  @type counter :: pos_integer

  def for_test_new do
    fn ->
      1..1000
      |> Enum.to_list()
      |> new()
    end
  end

  def for_test_push do
    counter =
      1..1000
      |> Enum.to_list()
      |> new()

    fn -> push(counter, 1001) end
  end

  @spec new :: counter
  def new do
    @table = :ets.new(@table, [:ordered_set, :named_table])
    1
  end

  @spec new(list) :: counter
  def new(list) do
    @table = :ets.new(@table, [:ordered_set, :named_table])
    push_many(1, list)
  end

  @spec push_many(pos_integer, list) :: counter
  def push_many(counter, []), do: counter

  def push_many(counter, [h | t]) do
    counter
    |> push(h)
    |> push_many(t)
  end

  @spec latest(pos_integer) :: list
  def latest(limit) do
    last = :ets.last(@table)
    prev(limit - 1, last, [])
  end

  @spec push(pos_integer, any) :: counter
  def push(counter, element) do
    :ets.insert(@table, {counter, element})
    # :ets.delete(table, counter - size)
    counter + 1
  end

  @spec prev(non_neg_integer, any, list) :: list
  defp prev(counter, last, acc)

  defp prev(_counter, :"$end_of_table", acc) do
    :lists.reverse(acc)
  end

  defp prev(0, last, acc) do
    :lists.reverse([last | acc])
  end

  defp prev(counter, last, acc) do
    prev(counter - 1, :ets.prev(@table, last), [last | acc])
  end
end

is much slower than the naive list implementation above …

Couldn’t use Benchfella with :ets, so had to turn to :timer.tc

history_ets_push = Quote.HistoryETS.for_test_push()
:timer.tc(history_ets_push) # returns ~4 us

:ets.delete(:history_ets)

history_ets_new = Quote.HistoryETS.for_test_new()
:timer.tc(history_ets_new) # returns ~600 us

whereas using

  def for_test_new do
    fn ->
      1..1000
      |> Enum.to_list()
      |> new()
    end
  end

  def for_test_push do
    history =
      1..1000
      |> Enum.to_list()
      |> new()

    fn ->
      push(history, 1001)
    end
  end

for the list implementation:

history_push = Quote.History.for_test_push()
:timer.tc(history_push) # returns ~1 us

history_new = Quote.History.for_test_new
:timer.tc(history_new) # returns ~60 us

Will try :epocxy now, thank you, @pmonson711!

You may be interested in riak-ts, an open source NoSQL database optimized for time-series and written in Erlang.

http://docs.basho.com/riak/ts/1.5.2/

I’ve not used it myself yet but it looks interesting for logging time series data.

1 Like

I really quite find Benchee a lot more useful nowadays. :slight_smile:

A couple of months ago, I was working on something that exactly keeps track of things like market data, where you have a sliding window of datapoints that you want to aggregate in possibly multiple ways.

The code was uploaded to Github; it’s not yet on Hexpm because it is still a little bare-bones (although there are tests!).
Actually, the tests might be the best explanation as to how it works :slight_smile: .

2 Likes