Remote API synchronization/diffing with GenStage/Flow

Background

I work for a company that does security for customer’s public/private cloud accounts such as AWS, Azure, Google, etc. A big part of what we do is continually harvest down all resources from each account via the provider’s API to 1) keep a synchronized copy of all their resources and 2) to diff the local vs remote to get a stream of creations/modification/deletions. Looking at Elixir for all the obvious reasons but the biggest reason is our current python solution doesn’t scale to the thousands of accounts and millions of API requests that we need to make.

Where I’m at

I actually have a pretty good first implementation for scheduling all of the reoccurring harvests and fetching stuff from AWS and the database is straight forward. This means my unit of concurrency is per api endpoint. Which is 100x better than our current solution! Winning :grin: However, that means per api endpoint we’re single threaded for fetching remote/local and diffing the results. The goal of this post is to field some ideas for how to optimize this.

Current pattern

Let’s call this the naive pattern.

  1. Paginated via API to retrieve all remote resource for a region. Place in map keyed by id.
  2. Query database for all resources in that region. Place in map keyed by id.
  3. Make a set from each map’s keys.
  4. Then take the differences of the sets to get ids of new and deleted resources
  5. The intersection is resources that might have changed. Iterate through all of them and diff for changes.
  6. Return list for created, modified and deleted resources.

Cons:

  • It’s eager. Take EBS snaphots for example. Its not uncommon to have tens of thousands.
  • Failed API request invalidates the whole harvest because the set differences/intersections require knowing all ids or you get false positives.
  • Reading all records from the database will eventually lead to a bottleneck.
  • API request, parsing, diffing are coupled.

Idea 1: GenStage API paginator producer

The foundation would be a producer that makes the API request parses and supplies events to a flow or GenStage. I could also just get the pagination token and immediately pass the response to be parsed in the next stage but might not be worth it.

Idea 2a: Second producer for local results from database.

I know Ecto.stream doesn’t work outside of a transaction at the moment but I did find Bourne which has a GenStage producer for streaming (haven’t tested it yet). So apparently you can have multiple producers. Hard to wrap my head around but I think you could use Flow.Window and Flow.window_join/8 to join the remote and local to do diffs if things aren’t found recycle them until the producers are done.

Cons:

  • Complicated. I don’t think I would even know how to approach this.
  • Order is not guaranteed. Whats the point of being lazy if you’re recycling events or you need to wait until you have halfway through loading before you start to get overlap between sets.

Idea 2b: Load from db based on remote ids

For each batch of resources from the remote, pull out the ids and query those from the db. Missing from the db is a create and check for changes. We can fan out to multiple consumers. Somehow send all the known remote ids to a consumer to aggregate. When the producer is done fetch all resources that don’t have those ids as the deletes.

Cons:

  • Slam the database with reads
  • a NOT IN query can only be so big or you hit the query max packet limit of the database. So this won’t work for the tens of thousands of snaphots example stated above.
  • False deletes are possible if you don’t halt at an api error.
  • Most things don’t change.
  • I have no idea how to make deletes work

Idea 3: ETS as source of truth

Instead of querying the same thing for every harvest, when the supervisor starts, fetch all current local records and put each resource in ETS under its own key. This means we’re not limited by the db and we can have a TON of consumers. Something is new if the key is nil. We also have the added benefit that if the harvester fails part way through we don’t have to throw everything away.

Cons:

  • Slam the database at startup
  • Additional coordination to make sure you don’t start harvesting until ETS is populated
  • Probably need to periodically diff ETS to the database to ensure there’s not drift.
  • ETS table per account? Per harvester? vmargs to increase limits.

Conclusion: What about deletion?
Currently I think Idea 3 is my best choice. I also have a plan on how to shard harvesters across a cluster so memory utilization isn’t a big concern.

Our one saving grace is that deletes can be eventually consistent. It’s pretty damn important if someone opens port 22 but not super important to know that a snapshot was deleted.

So here’s my idea for deletes. When we start our supervisor we create a single worker with an ETS for counters by harvester job. Each resource in ETS is a tuple of a counter and the struct. At the beginning of the harvest we increment the counter and pass the number into the start_link of the GenStages. For each resource we harvest we set the counter value in the ETS tuple. After the harvest completes, spawn a worker to do a :ets.select_match where the pattern matches resources who’s counter is less than the current harvester counter. So kind of a “mark and sweep” GC.

I don’t know if this is a good solution yet for deletes. I welcome any suggestions! Thanks!!!

So hard to say with that level of detail, but from the description I would first try and exhaust what the database can do in this situation, and measure / measure / measure and measure some more before committing to any major undertaking to improve the parallelism. Knowing where the bottlenecks are is step 0 in understanding where to work on removing bottlenecks, after all … it may even turn out that "single-threaded’ diff’ing per-endpoint may be the best global solution if lots of endpoints are being queried in parallel (so that the benefits of of parallelizing the processing of any one endpoint is drowned in the parallelism of the various endpoint checks happening simultaneously, and just becomes more overhead).

When I see things about finding map merges, intersections, etc. I do wonder about dropping to the db itself. Self-joining queries comparing the last two entries to a given endpoint is likely to be quite fast depending on the db design and even finding changes could be easy (list all resource ids for an endpoint that exist in one or the other of two snapshots and all resources ids whose definition changes… ), and a good database should make mincemeat of such a query with the right indexing. Would also avoid the overhead of passing that data back and forth from application to db, leaving the application to handle the writing of new entries and the (asynchronous, even) scheduling of diff’s when a new endpoint dataset is retrieved.

Best of luck!

2 Likes

measure / measure / measure and measure some more before committing to any major undertaking to improve the parallelism.

Don’t worry I’m starting with the simple solution first! I couldn’t agree more and I think you’re right that I’m being a little “process crazy” here with some of my suggestions. Truth is for most companies they only put resources in one or two regions which means most of these API calls return and diff nothing. The process and message overhead wouldn’t be worth it. It’s those pesky outliers.

(╯°□°)╯︵ ┻━┻

Of course there’s nothing saying we can’t be adaptive and change strategies when we encounter an outlier.

So I’ve been thinking about your suggestion to do the diffing in the database and I quite like it. It reminded me of temporary tables which might be a perfect fit for some of these huge payloads. You had mentioned a self-joining query which would work too but we don’t keep multiple versions in the db; it’s an update in place.

The only challenge I can think of with doing the work in the db is a minor one. If you need to diff the ancillary data by joined tables of one to many relationships. You could maybe do these with lateral joins (showing my lack of knowledge here) but we’re using MySQL so that’s not an option. Fortunately snapshots, which are the outlier, don’t have that problem.