Ecto with Postgres columnar addin Citus DB

Is there a way to use Citus DB’s column store functionality via Ecto?

I’m currently writing a package that wraps the Bloomberg API so that it can be accessed via Elixir, but there are tens of ticks per second per security and up to 2.5k securities, so a column store is mandatory, because it’s very easy to get well over 100m ticks per day (though real-world workloads are more likely in the 10m range per day, and realistically, significantly downsampled from there).

I’d ideally like the package to configure tables automatically if Postgres is the chosen database, and for that this class of commands would need to be supported:

CREATE TABLE simple_columnar(i INT8) USING columnar;

That is, creation of the extension, and the Using columnar qualification. I don’t really want the user to have to mess with psql or anything else. Can Ecto be instructed to perform such non-standard migrations/db creation instructions?

(for avoidance of doubt, yes I’ll be using the NX stack for analysis).

Failing any Citus solution, what are my options for large ingest and efficient time-based range querying using Elixir? IE: Is ecto the right tool for OLAP as opposed to OLTP?

I’ve never tried Citus and have somewhat limited knowledge of all its moving parts, but using Ecto (or just plain Postgrex) should absolutely be feasible. Naturally Ecto/Postgrex won’t handle getting the extension on the PostgreSQL server in the first place, but the commands to install it into a database are all fair game.

If the reason you want Ecto is related to migrations, look at: Ecto.Migration.execute/1 (or /2) Ecto.Migration — Ecto SQL v3.10.2. The CREATE EXTENSION command minimally would have to be done that way (as far as I recall). The Ecto migrations do have DSL for creating tables, but not sure if there’s a way to get the USING columnar in with that DSL. There is Ecto.Migration.fragment/1, but looking over the docs its not nearly so flexible as the fragment function in the query DSL.

As for querying, non-standard PostgreSQL types may need to be represented by custom Ecto & Postgrex types. I haven’t made any for awhile, but they’re pretty straightforward: Ecto.Type — Ecto v3.10.3 & Postgrex.Types — Postgrex v0.17.3.

Anyway, there are other people here who may be more helpful than this or with direct experience trying to use Citus… but for now this should give you some avenues to explore.


Might also be worth looking at the timescale extension for Postgres for dealing with large time series datasets as it handles table partitioning.

Another thing you could look at is reducing the column width on your types and get even more creative using a delta from previous row both for time and amount in cents or pips. I have done this previously as tick data doesn’t have a large range between ticks. Any compression you add in addition is even more effective. I would also encode start/end/checkpoint flags into the event type to track gaps in the dataset and deal with large range moves as well as periodic checkpointing of the current value.

You can then build a view on this for querying which hides the delta calculation from your application tier.

In general I also try to isolate complicated queries to the database by creating functions that return tables and query on those from my application(s). If they are parameterless then I create a readable view, and if they satisfy certain constraints then an updatedable view (which can be queried and updated like any regular table.

To use functions in your from clause Ecto from supports passing parameters using a fragment.

# Fragment calling a function and returning an undefined set of columns (you can alternatively select specific columns from f if you want also)
from(f in fragment("select myfunc(?::text, ?::integer) as x", ^"param one", ^2), select: f.x)

Could you elaborate on the pattern you use for delta encoding the timestamps and values? By “reducing column width” do you mean using a smaller encoding type for example real instead of float or even using smallint? Also why would the deltas compress better (apart from what is obtained by using a lower byte count type)?

Also if I have to save full precision checkpoints, that implies another column, or indeed another table?

Yes I plan also to look at TimescaleDB.

It was a long time ago, maybe 2006 when I did this. Storage is a lot cheaper now so consider if the gain is worth the pain.

If you subtract say current price from the previous price you will typically be dealing with plus or minus a few points/pips/cents as the bid/ask/last price fluctuates. This compresses very well as the values are constrained to a high frequency small set of values.

Use a postgres small integer smallint which is 2 bytes and represents the delta from the previous value. Your event stream will need to encode the event type like bid/ask/last and additionally some condition flags for begin/end of a capture and checkpointing. You will need to allow deltas greater than your integer size, both for checkpointing absolute values and begin/end events to track data quality/gaps. So you will need a flag to handle absolute values encoded over 2 rows (32bits and 48bits respectively should be enough).

You can get creative and pack the event type (bid/ask/last) and condition flags (begin/end/absolute) into 4 bits (there is only 9 possible combinations of event and flag) and use the remaining 12 bits for the delta into that smallint as well (this means 2 rows provides an absolute value dynamic range of 24 bits). Write postgres functions to pack and unpack the tuple of {event, flags, value}.

1 Like