I set out to write my own Cassandra driver because the protocol allows for multiplexing a single connection (i.e. you don’t need a connection pool) and I didn’t understand why Xandra doesn’t take advantage of this.
But after reading the DBConnection documentation, I saw this…
Some callbacks will be called in the calling process, with the state copied to and from the calling process. This is useful when the data for a request is large and means that a calling process can interact with a socket directly.
So I figured that’s why; to not be constantly copy data in and out of processes.
My idea was to have a Genserver that manages a TCP socket, and executing CQL commands means copy data (iolists) into the process to then be sent to the socket (and vice versa with reading data off the socket.
I decided to finish writing the client anyway just to see how slow it was… and to my surprise, it’s not slow.
The benchmark does 1000 inserts then 1000 selects using 1,024 byte rows.
Then 1000 inserts and 1000 selects using 1,048,576 byte rows.
Then a single insert/select using an 8 mb row.
My driver sometimes beats Xandra even though it’s copying the data into a process which then sends it across the socket. Same with reading. The connection process reads the socket, then does a Genserver reply, copying the data to the calling process.
Binaries up to 64 bytes are allocated directly on the process’s heap, and their entire life cycle is spent in there. Binaries bigger than that get allocated in a global heap for binaries only, and each process to use one holds a local reference to it in its local heap. These binaries are reference- counted, and the deallocation will occur only once all references are garbage-collected from all processes that pointed to a specific binary.
I knew about the optimization around copying binaries in and out of processes. But I guess I was surprised that tuples and iodata have the same optimization?
Where the json was 1-5k of binary. And that code was extremely slow to run. Orders of magnitude slower than just using Enum.each.
But I changed it to something like…
list_of_stuff
|> Stream.map(fn {metadata, json} ->
{metadata, json} # yes, redundant, I'm just bad at coming up with code examples.
end)
|> Task.async_stream(fn {metadata, json} ->
data = Jason.decode!(json)
...
end)
And it was fast.
So I’m just confused as to what the “rules” are when trying to optimize large message passing between processes. I thought one of the big downsides to message passing is the copying of data all the time, but it seems a lot of cases are highly optimized by the vm/runtime.
As I understand it, in your first example you run the Jason.decode single threaded because of the Jason.decode inside of Stream.map where in the second example you run in parallel (in the amount of CPUs you have in the host by default).
To prove the above for myself I grabbed your example code and let it decode on 50 items of big JSON (pokedex.json of 287kb per decode) but also measure start and end times around the Jason.decode and output that and then graph it inside a livebook with vega lite. The resulting graph/image is this;
You can read it as;
Vertical from top to bottom is the iteration number,
Horizontal is 0 = start time (everything is in nanos)
The bars have start_time and end time as timing around the Jason.decode.
The yellow/orange bar is doing the Jason.decode inside the Task.async_stream.
The blue one is doing the Jason.decode inside the Stream.map
Hopefully this gives some insight into your examples
(Btw you can detect that I have 10 cores to work with as the first 10 yellow iterations are all working in parallel.)
Sorry, my original post conflated some things. Essentially, I had an Enum.each that I replaced with a Task.async_stream and instead of making it faster, it drastically slowed things down.
I made it fast again by passing a tuple instead of a map, which lead me to the idea that passing tuples (or iolists or binaries) to processes is somehow optimized.
I knew about the optimization around copying binaries in and out of processes. But I guess I was surprised that tuples and iodata have the same optimization?
Tuples and iodata do not have this optimization. And even refcounted binaries are not the optimization DBConnection uses. If you take a closer look, you’ll find out that the socket is moved from one process to another as a part of ets table, leveraging ets heir mechanism when calling process dies.
So I’m just confused as to what the “rules” are when trying to optimize large message passing between processes. I thought one of the big downsides to message passing is the copying of data all the time, but it seems a lot of cases are highly optimized by the vm/runtime.
Just don’t optimize anything. One pattern I use in day-to-day basis is Agent. If data is large and operations are small, Agent is a perfect fit for working with this data
Isn’t the whole point of that so that data isn’t copied from one process to another then finally to the socket? The calling process can write to the socket directly?
calling-process -> socket
vs
calling-process -> connection process -> socket
So why does my code, which does the latter, outperform it?
I can see that your benchmarks show the opposite, and we can only guess why is that, because I can’t see the code for benchmarks and the Cassy library. My best guess is that performance of the database (which writes this data to the disk) influences every benchmark way more than performance of the adapter
From what I can see in benchmarks, my guess was correct. I’d suggest to try to find a way to measure the time from “socket received complete response” to “we have the response structure instantiated as a result of execute function”
No, it does not. And that’s one of the trade-offs here: iodata are the most efficient format for encoding data. You can build a binary though and pass a reference around.
At the same time, this may not matter much at the end. All HTTP clients in Erlang have historically copied data between processes, up until Mint and Finch, and I don’t think it was ever a major blocker to anyone. Finch is also a good example because:
HTTP 1 does not support multiplexing, so it uses the same technique as DBConnection where it brings the socket to the client and have the client write/read from it directly
HTTP 2 does support multiplexing and therefore it is process-based
However, depending on how multiplexing works, you can still leverage the “writing to the socket from the client” when sending requests. You’d need this:
You need to be able to add your own IDs to queries you submit to the database. This will allow to associate IDs to processes to queries
When you are going to run a query, generate a unique id (System.unique_integer is fine), and send the ID to the socket owner process
The socket owner will monitor the PID, store its ID, and return the socket to the client
Write the query to the socket from the client with the given ID
Now the socket owner will wait for responses. Once they arrive, use the ID to lookup for the client process and stream the parts to the client process. If the client process terminates, remove the ID and you can ignore all messages you receive without a matching ID.
With this approach, encoding and decoding happens on the client. There is no copying for send, only copy for receiving as they arrive.
This approach will only work for atomic operations on socket. Any kind of transaction or stateful operation is game over.
That’s pretty much exactly how the client works now except for the sending part. Also, the request id is only 2 bytes, so the Cassandra protocol suggests incrementing an integer with roll over. So that means the socket owner process holds that as state and increments it.
I’m curious to see how much performance increases if I change it to where the client process can write requests to the socket directly.
There is no copying for send, only copy for receiving as they arrive.
I was wondering about this. What if the socket owner process receives the frame header, which has the request id and body length in it, then gives the socket to the client process for that request id which then reads the rest of the frame body off the socket directly?
Thanks for the info on the Erlang HTTP clients and how Mint/Finch handles HTTP1 vs HTTP2! It’s super helpful to know how others do things…
You can probably use a :counters for that then to still leverage the above.
You will also see better numbers if you are doing concurrent testing rather than absolute numbers for a single connection/socket, because this approach also reduces contention on the socket owner
This is tricky because if a process crashes while reading, you don’t know how much it read, so you need to throw the socket out. That’s the same issue with transactions.
Which brings me to another point: your writes have to be atomic (i.e. you call gen_tcp.send once with the whole query).