Background
I work for a company that does security for customer’s public/private cloud accounts such as AWS, Azure, Google, etc. A big part of what we do is continually harvest down all resources from each account via the provider’s API to 1) keep a synchronized copy of all their resources and 2) to diff the local vs remote to get a stream of creations/modification/deletions. Looking at Elixir for all the obvious reasons but the biggest reason is our current python solution doesn’t scale to the thousands of accounts and millions of API requests that we need to make.
Where I’m at
I actually have a pretty good first implementation for scheduling all of the reoccurring harvests and fetching stuff from AWS and the database is straight forward. This means my unit of concurrency is per api endpoint. Which is 100x better than our current solution! Winning However, that means per api endpoint we’re single threaded for fetching remote/local and diffing the results. The goal of this post is to field some ideas for how to optimize this.
Current pattern
Let’s call this the naive pattern.
- Paginated via API to retrieve all remote resource for a region. Place in map keyed by id.
- Query database for all resources in that region. Place in map keyed by id.
- Make a set from each map’s keys.
- Then take the differences of the sets to get ids of new and deleted resources
- The intersection is resources that might have changed. Iterate through all of them and diff for changes.
- Return list for created, modified and deleted resources.
Cons:
- It’s eager. Take EBS snaphots for example. Its not uncommon to have tens of thousands.
- Failed API request invalidates the whole harvest because the set differences/intersections require knowing all ids or you get false positives.
- Reading all records from the database will eventually lead to a bottleneck.
- API request, parsing, diffing are coupled.
Idea 1: GenStage API paginator producer
The foundation would be a producer that makes the API request parses and supplies events to a flow or GenStage. I could also just get the pagination token and immediately pass the response to be parsed in the next stage but might not be worth it.
Idea 2a: Second producer for local results from database.
I know Ecto.stream
doesn’t work outside of a transaction at the moment but I did find Bourne which has a GenStage producer for streaming (haven’t tested it yet). So apparently you can have multiple producers. Hard to wrap my head around but I think you could use Flow.Window
and Flow.window_join/8
to join the remote and local to do diffs if things aren’t found recycle them until the producers are done.
Cons:
- Complicated. I don’t think I would even know how to approach this.
- Order is not guaranteed. Whats the point of being lazy if you’re recycling events or you need to wait until you have halfway through loading before you start to get overlap between sets.
Idea 2b: Load from db based on remote ids
For each batch of resources from the remote, pull out the ids and query those from the db. Missing from the db is a create and check for changes. We can fan out to multiple consumers. Somehow send all the known remote ids to a consumer to aggregate. When the producer is done fetch all resources that don’t have those ids as the deletes.
Cons:
- Slam the database with reads
- a
NOT IN
query can only be so big or you hit the query max packet limit of the database. So this won’t work for the tens of thousands of snaphots example stated above. - False deletes are possible if you don’t halt at an api error.
- Most things don’t change.
- I have no idea how to make deletes work
Idea 3: ETS as source of truth
Instead of querying the same thing for every harvest, when the supervisor starts, fetch all current local records and put each resource in ETS under its own key. This means we’re not limited by the db and we can have a TON of consumers. Something is new if the key is nil
. We also have the added benefit that if the harvester fails part way through we don’t have to throw everything away.
Cons:
- Slam the database at startup
- Additional coordination to make sure you don’t start harvesting until ETS is populated
- Probably need to periodically diff ETS to the database to ensure there’s not drift.
- ETS table per account? Per harvester? vmargs to increase limits.
Conclusion: What about deletion?
Currently I think Idea 3 is my best choice. I also have a plan on how to shard harvesters across a cluster so memory utilization isn’t a big concern.
Our one saving grace is that deletes can be eventually consistent. It’s pretty damn important if someone opens port 22
but not super important to know that a snapshot was deleted.
So here’s my idea for deletes. When we start our supervisor we create a single worker with an ETS for counters by harvester job. Each resource in ETS is a tuple of a counter and the struct. At the beginning of the harvest we increment the counter and pass the number into the start_link
of the GenStages. For each resource we harvest we set the counter value in the ETS tuple. After the harvest completes, spawn a worker to do a :ets.select_match
where the pattern matches resources who’s counter is less than the current harvester counter. So kind of a “mark and sweep” GC.
I don’t know if this is a good solution yet for deletes. I welcome any suggestions! Thanks!!!