Efficient way to handle asynchronous two-way messaging across data centers?

I have a very particular use-case. I originally went completely off-track and assumed Kubernetes and/or BEAM were built for this kind of communication. I have since grown out of the naivety of my approach. Kube is not built for it, though it does offer federation for multi-cluster setups across data centers.

I then found:

http://learnyousomeerlang.com/distribunomicon

Which goes into a whole heap of reasons why I don’t want to attempt this with BEAM.

So here I am, wondering just what tools I have at my fingertips, and I’m hoping somebody can point me in the right direction. I’m new to Elixir/BEAM and so don’t fully understand what is advisable or not, or what is on offer.

the problem

Essentially, what I want to do is, as efficiently as possible, send a request from one server to a specific server (in a particular location), have that perform some operation and return me a result.

My current application runs through thousands of these ‘requests’ using Task.async_stream and Stream.chunk (to perform db queries in batches). As I say, I originally figured I would just fire up a cross data center Erlang cluster, use Node.connect and send messages to relevant applications. This now seems incredibly naive after reading the above material.

My second pass at this is to use an internal api… very simply each of my applications/workers will have an exposed api that I then use to send workloads to, which then perform and return a response.

Does this sound sensible?Just wanted to sanity check this here before I waste another two weeks designing something I don’t need.

I say two-way messaging in the title because I’m wondering if there is an even better way of getting data to/from my nodes that I’m perhaps missing (message queues/passing/rpc etc).

3 Likes

You have messages you want to send from one node to another and for that node to send results back. What is it I’m missing that makes this so unique?

It details some of the ways distributed computing is hard, but this doesn’t have anything to do with the BEAM specifically, but just distributed computing…

This is still perfectly doable with out-of-the-box tools on the BEAM. Just don’t bake in any assumptions about availability, the impossibility of stale results, etc… You won’t be fixing any of these issues by adding HTTP APIs, if that’s what you were thinking.

It’s possible that I’ve misunderstood exactly why you thought this wasn’t feasible on the BEAM, so more information is welcome. :slight_smile:

I did mean to mention that. I read the article - it’s not just about BEAM; it’s mostly about distributed problems.

The question is, is it wise or suitable for me to be connecting nodes through BEAM from say a server in Dallas to a server in Singapore? Taking into consideration the amount of chatter between them. The reason I ask, as stated, is because I’m new to Elixir/BEAM, so I’m not sure what “out-of-the-box” tools would fit this use-case, and I’m not even sure this would be a problem – again, I’m not entirely sure of the networking internals (though I will educate myself).

I need to route requests to specific locations, and some of those locations will not be anywhere close to the caller. I’ve come over here straight from Kubernetes channel because I had stupidly implemented a kube cluster with nodes in different continents and was promptly put right. I just don’t want to repeat the same mistake with BEAM.

From the article:

Sadly, this means that modern Erlang applications can rarely be clustered across different data centers. In fact, it isn’t recommended to do so. Most of the time, you will want your system to be based on many smaller, walled off clusters of Erlang nodes, usually located in single locations. Anything more complex will need to be left to the developers: either switching to SSL, implementing their own high level communication layer, tunneling over secure channels, or reimplementing the communication protocol between nodes.

1 Like

I’ve felt the same way, but haven’t gotten to the geographical distribution part yet.

There is some discussion here talking about problems with EPMD - nodes losing connection for unknown reasons. It’s not exactly the same, because I think it’s still within the context of a single data center. But the problems posed by having 1024 nodes communicating might be similar to a “hostile” cross-data-center connection.

The solution they found was to do what is recommended and create their own membership system that uses TCP called Partisan.

There’s also a service discovery library called Sprinter that works with Kubernetes.

Lasp PG let’s you transparently send messages to processes similar to PG2, but it’s more fault tolerant since it uses Partisan.

I’ve only done light research on this problem, but I share the same concerns; I’d like to eventually have geographical distribution.

1 Like

Very interesting. Thanks.

I’ve looked into Consul, which works with Kubernetes. Not heard of Sprinter, will check that out!

Aside from planning for the connection to be unavailable, I’m not sure why you can do this with the BEAM across a secure connection. You can configure nodes with TLS or use a VPN, tinc, etc.

TINC makes a lot of sense because it works in a similar way to BEAM clustering fwiw.

There was a talk at ElixirConf.eu in 2016 about how the OTP team were planning to address issues with distributed Erlang. http://www.elixirconf.eu/elixirconf2016/zandra-norman. I’m not sure of the current state of what was talked about or when it might actually get released. I can’t remember the talk details so don’t know how much of it applies to cross DC networks. Until something like this is in place I wouldn’t be connecting BEAM nodes outside of a local network personally.

If this problem landed on my desk and AWS was an allowed solution I’d likely use DynamoDB Global Tables. Though to be fair they aren’t available in enough regions yet. Whatever technology gets used you’ll still need to figure out how it should handle the usual stuff like netsplits, dropped messages, duplicated messages etc.

Just seems like an awful lot of noise. TINC looks really cool, will check that out. Thanks.

Nice one. I’ll have a look.

This is primarily why I want to roll my own solution… so at least I can explicitly see the carnage and [hopefully] do something about it.

Yup. Already running into some of those issues! I will need multiple schedulers (for HA) and don’t want them producing duplicate messages, and there will be a time when the workers might be down and I don’t want them picking up crap-tonnes of now-obsolete workloads. It’s keeping me busy that’s for sure.

How many nodes are you thinking you’ll be clustering? The noise you’re alluding to me seems to me to be mostly an issue of the mesh model that BEAM clustering uses out of the box. If you’re planning on implementing something that’s supposed to replace it you’d better know what you’re doing and if you’re planning on clustering a few nodes together there is no real “noise” to speak of.

That’s a good question. We’re not talking about even hundreds of nodes here, but that meshing has me concerned. Seems like it will just get out of control very quickly.

When I say roll my own, I don’t mean networking model. I mean my own solution to this problem – at the moment leaning towards either TCP (Tinc) or an HTTP API (which will have added benefit of kick-starting the public api).

But looking at it, BEAM is using TCP under the covers… so…

Now getting excited about ZeroTier.

https://news.ycombinator.com/item?id=16326036

…and nope, I don’t really know what I’m doing… but that’s exactly how I’ve learned thus far and it’s worked great :slight_smile:

I’ve worked on a system where each service has an island of nodes (up to 8, because of using global and node ups/downs could lock the system for a long time otherwise) and communicate with each other over REST.

There are lots of AWS details in there for each service, they all have their own load balancer and whatnot. I die a little bit every time I have to use REST, but this system works.

Needless to say, this system works only if you can guarantee that each service can handle your maximum workload with a maximum of 8 of the beefiest machines you can provision. Luckily, it works out that if one service can’t handle the workload when on 8 of the beefiest machines you either have at amazingly popular product on your hands or there is some trivial rewrite that brings down the workload, because you don’t realistically put that much pressure on one microservice without being stupid.

Ha. Exactly! One of the reasons my focus is on getting this right and efficient from the get-go is because I have a very limited budget and some not-so-average early-stage requirements by way of multi-region, so I am trying to get every last drop out of my servers, while also staying HA. Have also considered using Lambda to offload work to particular locations.

Just another point of reference:

I feel that distributed Erlang should be used only to power the same code, i.e. multiple instances of the same “thing” which are connected into a cluster. Adding different types of systems (using different OTP apps and having a different process structure) might cause various problems with distributed parts of the code (e.g. pg2, Phoenix PubSub or Phoenix Tracker).

Which was a followup to this.

So that would make a case for designing an application specific protocol that would carry on the “conversation” across something like HTTP or a websocket.

Should also say, with the above said, using Kubernetes in this setup contradicts the need to squeeze the hardware. I’m starting to think I might just deploy with edeliver and see about connecting my erlang nodes through zerotier (just because, looks fun :stuck_out_tongue: and is apparently very easy to setup).

http://phx.io/

Maybe I’m missing the meaning of that. If I have a scheduler application on one node and a worker application on another node - is there a problem if I send a message from my scheduler to my worker? Aren’t we constantly passing around messages to different systems with different process structures, anyway?

I think maybe I’m missing some other behaviour of Elixir/Erlang in terms of isolation. Are those named distributed parts of the code shared across all nodes, used by different applications?

My interpretation of that statement goes back to the original goal of Erlang: “fault-tolerance” - i.e. distribution serving the needs of fault tolerance. Ideally your application should fit on one node but you may run it on multiple nodes so that a single node failure doesn’t take down your entire system.

I don’t think that it directly advises against running an “orchestrator application” on a separate node from a “worker application” but in terms of fault tolerance you would be expected to architect the solution in such a way that multiple orchestration nodes could be running (usefully) at the same time - while taking down one orchestration node doesn’t paralyse your system.

Erlang’s concurrency and distribution models were designed when single core CPUs and local area networks were pretty much the mainstay - while the concurrency model transferred nicely to multi-cores, I don’t think the distribution model fared quite as well when it comes to global networks.

1 Like

Good point. From what I’ve read that’s the P in CAP theorem? Partition tolerance?

That’s it. I don’t want to use something which wasn’t built for this case, and have my shoehorned implementation destabilise the initial foundations of the system before I’ve even started.

I’m am leaning towards to setting up a VPN (w/ zerotier) and connecting my cross-continent nodes that way. Not sure how this will work in practice, but I’ll be doing some testing to find out.

I just wanted to come back and say I ended up with a message broker (RabbitMQ).

The problem has been to distribute work across different regions due to location-based latency tracking and uptime monitoring. But the problem isn’t necessarily the distribution, but the act of making a request from a given location.

Serverless would actually be perfect but I’ve found it far too expensive.

I found a great plugin for RMQ which de-duplicates the messages on the brokers side, so I can queue duplicate messages from multiple publishers to maintain high availability. The broker has also given me lots of control over routing. I can now simply fire up a node anywhere and subscribe to a location-based queue on an exchange. I can handle this independently of other works and publishers.

Overall I’m very happy with the result, though, of course, I now have a message broker to worry about ! :stuck_out_tongue:

I have considered an alternative after reading around:

Send traffic to a load balancer (or balancers) which can route the traffic to specific nodes based on a location header, or whatnot. That way I can remove the need for a message broker. However, this will require more logic to be built into my HTTP client to handle different failure cases and is just not as flexible as the broker. This is also a fundamental change back to synchronous processing. I’d essentially be making requests directly, though they’d internally be performed from another location.

As much as I’m almost all in with my RMQ setup (considering the investment into it), I’m all ears on whether there may be an even better solution to this problem. I have also considered a message broker could become an integral part of my architecture, and perhaps that itself warrants some real-world experience building.

1 Like