How to structure this parallel problem

Hey,

I’ve been brainstorming how to solve this challenge for several days and I’d really like your guys input.

I need to ‘process’ 20,000 events per minute. To process 1 event, I need to hit 5 separate HTTP API endpoints (xml), wait for them to all return 200 OK (or timeout, or fail), then parse body of only the 200 code XML responses to find the ‘winner’, craft a message, and move on to the next batch.

These events are coming from a GenStage producer, and they will likely be in batches of ~50 because the consumer has a max_demand of 100.

My plan was (loosely…) to:

  • Take a look at the # of events (lets say batch #1 is 50)

  • Use Task.async (or something) to start 5 processes which are in charge of doing 50 requests each with buoy, using Keep-Alive and http pipelining (thank you @idi527). Sadly, these endpoints don’t support HTTP/2 so I’m having to squeeze every drop of performance out of HTTP/1.1.

  • So 5 endpoints * 50 requests each = 250 requests per batch * 400 batches = 100,000 requests per minute overall request volume (20K new events get generated by the producer every minute so all this I’m explaining will need to keep up with that).

  • Finally, some kind of Task.await and zip all 5 responses lists together so that I can loop through and find the ‘winners’ of each list…

Something like this:

[
  [a1, b1, c1, d1, e1],
  [a2, b2, --timeout--, d2, e2], 
  [--500 response--, --timeout--, d3, e3, f3], 
  [a4, b4, c4, d4, --422 response--],
  ... etc up to 50
]

QUESTIONS

Sooo, I am still a beginner at elixir, but I think buoy makes requests syncronously: https://github.com/lpgauth/buoy

So that means it’s not enough to just make 50 requests in a single Task.async. I think there needs to be a new Task.async for EVERY request… similar to how one might do this:

(1..50) |> Enum.each(fn (n) ->
  spawn(fn -> :buoy.get(url, timeout: 750) end)
end)

Now I doubt Task.async could work like that…(I’m still reading). I imagine each of the 5 tasks would instead spawn 50 more processes each… I just have no idea how the logistics of all this fit together.

The reason I wanted to use Task was so I didn’t have to keep track of all the responses flowing in, I could just wait for the buoy timeouts to expire. It’s like I need “another level” deeper of asynchronousness though. On top of this, the # of data API’s might change, so I don’t think I should hardcode that at 5. Can I dynamically chain Task with |>, kind of like ruby’s call method to call function names from variables (I have a list of feeds in memory I can loop over)?

Any suggestions? :japanese_ogre:

2 Likes

There are also async_* functions like async_get or just async_request. But they might be a bit cumbersome to work with if you want to receive responses.

1 Like

Maybe I can use yield_many/2 https://hexdocs.pm/elixir/Task.html#yield_many/2

And then set timeout to :infinity (because thats controlled by buoy so I know it won’t ever hang)

this is just a shot in the dark btw… if anyone has suggestions id love to hear them

1 Like

I also wanted to add one more detail

Let’s assume that each of the 5 API’s are stored in state so I can access them, and have different performance settings and that I individually tune the timeouts etc. I store that data in my list of apis like:

feeds = [
  %{name: "API #1", timeout: 750, http2_enabled: false, url: "xxx"},
  %{name: "API #2", timeout: 1000, http2_enabled: false, url: "yyy"},
  # ... 3 more ...
]

So that information is available…

I’m just brainstorming on where I store / process the requests…

  1. doing the http processing in a separate GenServer queue which is wokring 24/7, which I push requests to and receive messages from when a request is complete

  2. or I can just do this in Task.async with processes alone in my GenStage handle_events step… i think I could just use a shit ton of tasks for this, but im worried if making tasks await every 50 events will cause unnecessary overhead… kind of hazy in my mind

I think the tricky part I’m trying to figure out is how to group the responses.

I can easily store the REQUESTS like this…

[
  [req, req, req, req, req],
  [req, req, req, req, req],
  [req, req, req, req, req],
  ...
]

But the hard part is I ALSO need to know which responses go with each other so I can determine a winner…

[
  [resp, resp, resp, resp, resp],
  [resp, resp, timeout, resp, resp],
  [timeout, resp, timeout, resp, resp],
  ...
]

I don’t necessarily have to do it on a line-by-line basis… I could just take the entire group of 15 responses above - 3 timeouts = 13 successful responses, and then take the “top 3” of those 13 responses.

But how do I asynchronously load data back into the same structure/package it came in?

You have to hit endpoint_1, endpoint_2, … endpoint_5.

  1. If any fail do you still have to do the rest ?
  2. Must you hit the endpoints in sequence or can you hit them randomly as long as you hit all 5 ?
  3. Do you determine a group of winners in the block of 50 or any winner is sufficient ?
  4. How long are the timeouts of each endpoint ?
1 Like
  1. yes, however I just ignore them. Essentially im choosing a ‘winner among the 5’. if 1 returns non-200 and 1 times out, then that becomes ‘choose a winner among the 3’

  2. i can hit them randomly, as long as all five are considered together… technically i can even consider them together in a batch… so if i have 50 groups of 5, i can look at ALL 250 and pick the top 50… or lets say 10 failed in that group of 250, i can look at the 240 successful and pick the top 50.

  3. I think i answered this in #2, but it can be any winner in the batch of successful responses, as long as they’re considered together. Inside each response is basically a cost figure, and im going to order all responses (excluding failures/timeouts) by cost figure and take the top N. Keep in mind the number of endpoints my increase/decrease every few minutes, so if the batch has some events made up of 4 api endpoints, and some events made up of 5, that is OK still… just as long as we arent arbitrarily taking chunks. they need to be at the event boundry.

like:

[r, r, r, r],
[r, r, r, r, r],
--split here is OK--

[r, r, r, r],
[r, --split here--, r, r, r, r] is BAD
  1. I am not sure… i will have to see under load, but i can put them a bit aggressively. to start out with maybe something lenient like 750. im going to track “wins, losses, max cost, min cost, avg cost, timeouts, errors” for every single batch so i can keep a close eye and adjust it

Ive been sitting here pondering and i thought of one more way to articulate my (hard to explain) thoughts.

I THINK the root trickiness of this is that i want EVERY request to be async, so i believe calling async/wait for each batch would block FUTURE REQUESTS. What i really want to do is keep allowing future requests to come in, and async/wait ONLY on a small subset of those requests (each group or batch of groups).

I would start out by not using GenStage. Currently the Elixir app isn’t the limiting factor, its the HTTP endpoint.

For each event coming in I would spawn a GenServer or Task with a block_id (likely a Pid), and a list with the sequence of HTTP endpoints (randomize) e.g. [e1, e5, e2, e4, e3]

When the process has values for all endpoints it would notify a gather process (hence Pid block_id) of the winning endpoints. There would be one gathering process per block per event boundary. That would address your splitting issue.

If there are no winners is there a need to notify the block ? I suspect no, in which case you are prefiltering for the next step.

I should mention that GenStage is used for all the other steps (this step doesnt use genstage). It’s merely a single point in the whole pipeline.

Someone on IRC mentioned I can just make a new process for each group, and in that process have a simple Task.async and Task.await… at that point, when that process finishes it means the whole group finished, and I handle them like normal.

This seemed to make total sense to me.

Now Im going to reread your answer a few times (just wanted to throw taht out first)

The BEAM by default (configurable) can handle 260,000 processes. At 20,000/min that’s nearly 13 mins of events which it can queue internally.

1 Like

Yes, my answer is pretty much the same idea although I’m using GenServer because I feel I have better control over it.

Ok awesome, yeah I dont fully grasp it yet. The only reason I used it so far was because of the discord article on handling XMPP connections which is the next battle I face.

I need to handle 1000 connections max with 100 unacknowledged ACK/NACK responses. Ill cross that bridge later when I come to it :sweat_smile:

PS i should mention its so stupid simple to put each async/await in its own thread that im facepalming so hard for not having thought of that… god damnit. elixir is so cool

Tell me you are using MongooseIM :stuck_out_tongue:

1 Like

Damn that looks amazing… Uhh, actually I was using a version of Romeo that was modified by Discord to suit their needs here romeo/lib/romeo at master · discord/romeo · GitHub

The one thing I was concerned about is how to handle CONNECTION_DRAINING and such…

Would you recommend MongooseIM? I had a look, it looks very well supported and very comprehensive!

I dont know how it compares to Mongoose.

The main stipulations are: