Help me understand DBConnection and copying large data in/out of processes

I set out to write my own Cassandra driver because the protocol allows for multiplexing a single connection (i.e. you don’t need a connection pool) and I didn’t understand why Xandra doesn’t take advantage of this.

But after reading the DBConnection documentation, I saw this…

Some callbacks will be called in the calling process, with the state copied to and from the calling process. This is useful when the data for a request is large and means that a calling process can interact with a socket directly.

So I figured that’s why; to not be constantly copy data in and out of processes.

My idea was to have a Genserver that manages a TCP socket, and executing CQL commands means copy data (iolists) into the process to then be sent to the socket (and vice versa with reading data off the socket.

I decided to finish writing the client anyway just to see how slow it was… and to my surprise, it’s not slow.

INSERT...
Xandra 519ms
Cassy  609ms

SELECT...
Xandra 579ms
Cassy  566ms

INSERT (prepared)...
Xandra 546ms
Cassy  572ms

SELECT (prepared)...
Xandra 568ms
Cassy  538ms

INSERT (large)...
Xandra 18271ms
Cassy  18420ms

SELECT (large)...
Xandra 8726ms
Cassy  9199ms

INSERT (single large)...
Xandra 149ms
Cassy  132ms

SELECT (single large)...
Xandra 64ms
Cassy  63ms

The benchmark does 1000 inserts then 1000 selects using 1,024 byte rows.

Then 1000 inserts and 1000 selects using 1,048,576 byte rows.

Then a single insert/select using an 8 mb row.

My driver sometimes beats Xandra even though it’s copying the data into a process which then sends it across the socket. Same with reading. The connection process reads the socket, then does a Genserver reply, copying the data to the calling process.

Why?

3 Likes

Maybe because of

Binaries up to 64 bytes are allocated directly on the process’s heap, and their entire life cycle is spent in there. Binaries bigger than that get allocated in a global heap for binaries only, and each process to use one holds a local reference to it in its local heap. These binaries are reference- counted, and the deallocation will occur only once all references are garbage-collected from all processes that pointed to a specific binary.

(source: chapter 7.2 from http://www.erlang-in-anger.com/ )

I’m not completely sure if this answers your question enough :slight_smile: but at least it gives some insight in how the memory gets handled.

I knew about the optimization around copying binaries in and out of processes. But I guess I was surprised that tuples and iodata have the same optimization?

I had some code like this (for my day job)…

list_of_stuff # around 60k items
|> Stream.map(fn {metadata, json} ->
  %{metadata: metadata, data: Jason.decode!(json)}
end)
|> Task.async_stream(fn item -> ... end)

Where the json was 1-5k of binary. And that code was extremely slow to run. Orders of magnitude slower than just using Enum.each.

But I changed it to something like…

list_of_stuff
|> Stream.map(fn {metadata, json} ->
  {metadata, json} # yes, redundant, I'm just bad at coming up with code examples.
end)
|> Task.async_stream(fn {metadata, json} ->
  data = Jason.decode!(json)
  ...
end)

And it was fast.

So I’m just confused as to what the “rules” are when trying to optimize large message passing between processes. I thought one of the big downsides to message passing is the copying of data all the time, but it seems a lot of cases are highly optimized by the vm/runtime.

As I understand it, in your first example you run the Jason.decode single threaded because of the Jason.decode inside of Stream.map where in the second example you run in parallel (in the amount of CPUs you have in the host by default).

To prove the above for myself I grabbed your example code and let it decode on 50 items of big JSON (pokedex.json of 287kb per decode) but also measure start and end times around the Jason.decode and output that and then graph it inside a livebook with vega lite. The resulting graph/image is this;

You can read it as;
Vertical from top to bottom is the iteration number,
Horizontal is 0 = start time (everything is in nanos)
The bars have start_time and end time as timing around the Jason.decode.

The yellow/orange bar is doing the Jason.decode inside the Task.async_stream.
The blue one is doing the Jason.decode inside the Stream.map

Hopefully this gives some insight into your examples :smiley:

(Btw you can detect that I have 10 cores to work with as the first 10 yellow iterations are all working in parallel.)

2 Likes

Sorry if I wasn’t more clear. My serial code was orders of magnitude faster.

This was fast:

big_list
|> Enum.each(fn item ->
  data = Jason.decode!(item.json)
  write_to_database(metadata, data)
end)

This was extremely slow:

big_list
|> Task.async_stream(fn item ->
  data = Jason.decode!(item.json)
  write_to_database(metadata, data)
end)
|> Stream.run()

Sorry, my original post conflated some things. Essentially, I had an Enum.each that I replaced with a Task.async_stream and instead of making it faster, it drastically slowed things down.

I made it fast again by passing a tuple instead of a map, which lead me to the idea that passing tuples (or iolists or binaries) to processes is somehow optimized.

I knew about the optimization around copying binaries in and out of processes. But I guess I was surprised that tuples and iodata have the same optimization?

Tuples and iodata do not have this optimization. And even refcounted binaries are not the optimization DBConnection uses. If you take a closer look, you’ll find out that the socket is moved from one process to another as a part of ets table, leveraging ets heir mechanism when calling process dies.

So I’m just confused as to what the “rules” are when trying to optimize large message passing between processes. I thought one of the big downsides to message passing is the copying of data all the time, but it seems a lot of cases are highly optimized by the vm/runtime.

Just don’t optimize anything. One pattern I use in day-to-day basis is Agent. If data is large and operations are small, Agent is a perfect fit for working with this data

1 Like

Isn’t the whole point of that so that data isn’t copied from one process to another then finally to the socket? The calling process can write to the socket directly?

calling-process -> socket

vs

calling-process -> connection process -> socket

So why does my code, which does the latter, outperform it?

I can see that your benchmarks show the opposite, and we can only guess why is that, because I can’t see the code for benchmarks and the Cassy library. My best guess is that performance of the database (which writes this data to the disk) influences every benchmark way more than performance of the adapter

We can create a benchmark which just tests this idea of DBConnection’s approach vs your approach

Here’s the relevant code…

Notice it’s sending a message tuple {atom, integer, iodata} to the connection process. The iodata can be large (1k, 1mb, 8mb in the benchmarks).

You can run the benchmarks with…

docker compose up -d
# wait a long time for the cluster to come up, like 5 min
mix benchmark

Benchmark code is here…

We can create a benchmark which just tests this idea of DBConnection’s approach vs your approach

But still had that Task.async_stream issue which was resolved by passing tuples instead of a map. I feel like that’s the whole crux of the issue.

1 Like

From what I can see in benchmarks, my guess was correct. I’d suggest to try to find a way to measure the time from “socket received complete response” to “we have the response structure instantiated as a result of execute function”

No, it does not. And that’s one of the trade-offs here: iodata are the most efficient format for encoding data. You can build a binary though and pass a reference around.

At the same time, this may not matter much at the end. All HTTP clients in Erlang have historically copied data between processes, up until Mint and Finch, and I don’t think it was ever a major blocker to anyone. Finch is also a good example because:

  1. HTTP 1 does not support multiplexing, so it uses the same technique as DBConnection where it brings the socket to the client and have the client write/read from it directly

  2. HTTP 2 does support multiplexing and therefore it is process-based

However, depending on how multiplexing works, you can still leverage the “writing to the socket from the client” when sending requests. You’d need this:

  1. You need to be able to add your own IDs to queries you submit to the database. This will allow to associate IDs to processes to queries
  2. When you are going to run a query, generate a unique id (System.unique_integer is fine), and send the ID to the socket owner process
  3. The socket owner will monitor the PID, store its ID, and return the socket to the client
  4. Write the query to the socket from the client with the given ID

Now the socket owner will wait for responses. Once they arrive, use the ID to lookup for the client process and stream the parts to the client process. If the client process terminates, remove the ID and you can ignore all messages you receive without a matching ID.

With this approach, encoding and decoding happens on the client. There is no copying for send, only copy for receiving as they arrive.

This approach will only work for atomic operations on socket. Any kind of transaction or stateful operation is game over.

4 Likes

Any kind of transaction or stateful operation is game over

Why? Can’t you just use one single ID for each operation in a transaction?

That’s pretty much exactly how the client works now except for the sending part. Also, the request id is only 2 bytes, so the Cassandra protocol suggests incrementing an integer with roll over. So that means the socket owner process holds that as state and increments it.

I’m curious to see how much performance increases if I change it to where the client process can write requests to the socket directly.

There is no copying for send, only copy for receiving as they arrive.

I was wondering about this. What if the socket owner process receives the frame header, which has the request id and body length in it, then gives the socket to the client process for that request id which then reads the rest of the frame body off the socket directly?

Thanks for the info on the Erlang HTTP clients and how Mint/Finch handles HTTP1 vs HTTP2! It’s super helpful to know how others do things… :slight_smile:

1 Like

You can probably use a :counters for that then to still leverage the above.

You will also see better numbers if you are doing concurrent testing rather than absolute numbers for a single connection/socket, because this approach also reduces contention on the socket owner

This is tricky because if a process crashes while reading, you don’t know how much it read, so you need to throw the socket out. That’s the same issue with transactions.

Which brings me to another point: your writes have to be atomic (i.e. you call gen_tcp.send once with the whole query).

2 Likes

I’d love to see the source code for this livebook!