Erlang/Elixir native Etcd, Zookeeper alternative

Yet it has none of the features that make creating and monitoring work and state on another node possible, much less a way to actually connect two runtimes. Does it even have a primitive for referring to threads?

2 Likes

I am obviously biased but I think this could actually be the killer App for Elixir/Erlang.

weā€™re using am Elixir raft implementation here at work that is written, used and maintained by a reasonably large Japanese company

Can I ask, it is open source? Which one? Do you mean rafted_value or something else?

Can you clarify this a bit? I assumed you wanted similar functionality to be more readily available ā€œin library formā€ for building distributed Elixir/Erlang things without the need for external architecture. Do you just want these products to simply be written in Elixir? If so, why?

In my view the Killer App is basically full fledged distributed event processing pipeline in Erlang/Elixir. To enable that the 2 missing pieces are native ZK, Etcd alternative and relying on that as a key foundation native to the ecosystem Kafka like distributed log implementation. If we have those 2 pieces the value proposition that it presents is enormous vs any alternative.

2 Likes

I think itā€™s actually pretty one-sided in this case. Iā€™m pretty sure Kafka would rather handle 10 messages of reasonable size vs 5 million messages of any size, per second. While 1MB messages are largish, the bookkeeping work of 10 messages is small and even a small cluster ought to be able to handle 10MB/sec of throughput.

Whereas the bookkeeping overhead of processing 5 million small things is huge. If that bookkeeping is done by Zookeeper, then Zookeeper needs to be fast in order to handle 5 million serial requests per second. And if Zookeeper writes to disk before acknowledging a transaction, then disks need to be super performant. This was a deliberate example to highlight that while Kafka can read/write 10MB/s no problem, that if itā€™s using Zookeeper to track state (not store it), that Zookeeper also needs to be fast. Even without the Zookeeper sidebar, I think systems universally would rather handle fewer bigger chunks than massive amounts of smaller chunks.

So an Elixir native Kafka streams, that bakes the Kafka part into the app cluster as a library? Iā€™d be interested in that. In the meantime, Iā€™d settle for great Elixir bindings for actual Kafka streams.

1 Like

As I said, it depends. If you offer Kafka a batch of 5 million messages, itā€™ll slurp it in, burp, and say ā€œthank youā€ in no time. Not a lot of bookkeeping is necessary, as Kafka is just a (distributed) log system - itā€™ll update the high water mark to old + 5 million, but thatā€™s it. Zookeeper doesnā€™t come into play, it just coordinates cluster state; similarly, if Iā€™m a consumer, Iā€™ll get this exact batch in one message and ack it once (you can only ack an offset, not individual messages, and frankly, Kafka does not care that much about this to begin with itā€™s just a handy shortcut for offset bookkeeping - we have use cases were we never ack as we store the offsets in S3; you just ask for the next batch).

Do you actually run Kafka in production handling 5 million (or more) messages a second?

From what Iā€™ve read and heard in various ā€œProductionalizing Kafkaā€ presentations is that ā€œcluster stateā€ is much more than simply tracking the current node/partition map and then going to sleep until you add/remove a node. In fact, the internet seems to imply that ZK is hit as part of every write. Maybe this is due to replica state tracking, I dunno. But the idea that you can deploy a high performant Kafka cluster with underprovisioned ZK cluster is warned against in nearly every Kafka deployment guide.

But this isnā€™t a thread about Kafka. The main take away is that if youā€™re going to re-implement Zookeeper, donā€™t half-ass it because you think performance doesnā€™t matter. I was only trying to point to Kafka as a use case where Zookeeper performance matters very much. And I guess this may or may not be true.

My guess is that this is old information. Older versions of Kafka kept consumer group offsets in Zookeeper, and that is a bit of a bottleneck, of course; currently, offsets are kept in a compacted topic, allowing for much better scalability. The ZK clusters that back our Kafka clusters are essentially asleep, in contrast to some we use as distributed transaction locks (they do an impressive amount of locks per second, and the bottleneck, by the way, is more CPU than disk in our experience).

Absolutely. Also, donā€™t reimplement Zookeeper - ZAB is tricky :wink:

1 Like

AFAIK, Kafka streams are just a feature of the Java client library; emulating that on top of either kafka_ex or brod shouldnā€™t be too hard; I just wonder what the advantage would be?

Yes, thatā€™s the one. Itā€™s actually in two pieces, rafted_value and raft_fleet. One uses the other.

Both rafted_value and raft_fleet appear to be built on top of Distributed Erlang. Is that correct? If thatā€™s the case, is it being used within large clusters?

In general, how do large Elixir clusters track cluster membership efficiently? Iā€™d love to have a 100 node cluster be able to detect within a second if a node had crashed. Or sooner, if a participating member had problems with active communication.

This seems like one of the more common uses for Zookeeper and Etcd. Simply tracking heartbeats and active communication failures across a cluster without the need for a full mesh heartbeat system. This is one case where an external tracker is advantageous, over being a lib within your app cluster.

That is correct.

I donā€™t know what is ā€œlargeā€ for youā€¦ the applications using using the raft libraries are still in staging only, so they are running (real-world workloads) on 10 server clusters at the moment (real hardware, not VMs, as we are doing heavy compute on in-house systems). If it all pans out it will be running on 100-200 node clusters.

To be fair, not overly efficiently :slight_smile: By default they construct full-mesh networks and heartbeat each other. This only really scales to ~100 nodes, from my understanding. There are ways around this, however. You can split clusters into subsets with just a few connections between them, so not full-mesh. This is possible more or less out-of-the-box using what are referred to as ā€œhiddenā€ connections: they connect only to the nodes specified, rather than promiscuously to every node in the mesh. Libraries like bootstrap take advantage of this.

There has been some research work on better approaches than full-mesh, and I assume that if/when >100 node clusters become common that such efforts will leave the lab and eventually find their way into the wild.

Yes, and this is also possible without a great deal of effort on the BEAM. The concurrency and good network performance means there are a number of very good libraries that provide scalable network solutions to be built easily.

Itā€™s a matter of people taking the time to create them, which really should not be a large investment due to the existing libraries and OTP itself. One reason we donā€™t already have these in hand, IMHO, is a combination of Elixir (and even Erlang, really) only recently gaining real traction and full clusters (which is obviously not the same as many deployed but separate applications) usually not approaching the 100+ node world.

2 Likes

LASP (that was mentioned here already) has the tools for overriding erlangā€™s built in distribution and make it scale for more than 100 nodes.

It was tested with a 1_000 nodes cluster and it all went ok. It is meant for clusters of 10_000 nodes though they havenā€™t managed to really test that.

I would bet that making LASP really just a library would benefit the whole ecosystem. Perhaps we need a LASPEX project. I would love to contribute to that. The Erlang APIs are a bit too verbose currently (they even acknowledge this).

3 Likes