Software design: processing remote collection of items via paginated API, while properly handling errors

What I’m struggling with

I want to process a collection of items that are remote. These items aren’t all available in a single request: to get all items, multiple requests will have to be made (e.g. via a paginated api). Further, there are several of these collections I’d like to retrieve and process, and they have varying sizes. The collections of items would be retrieved from various sources, such as paginated APIs or scraping. Although each collection of items would be small enough to fit in memory at any one time, I’d ideally like to avoid this in order to prevent issues down the road.

Conceptually, I’d like to stream the collection of items and process them one at a time (where by “processing” I currently mean persisting them to the DB, although some response may be deserialized beforehand).

Approaches I’ve considered

Stream.resource/3

It would be really nice to stream the collections via Stream.resource/3 (example here), but I don’t see how to nicely handle the non-happy path:

  • network errors: having retries is obviously trivial, but in case the server is unresponsive (e.g. circuit breaker is open), the error needs to be reported to the caller. I considered implementing this via throw/raise, but that means that every time I want to iterate over a collection, the code must be surrounded by try/catch/rescue, which doesn’t seem very ergonomic.

  • politeness: whether scraping or interacting with an API, I can’t just slam the remote server with requests as fast as I can process the data. Delays between requests must be inserted, etc. Here also, while it would be possible to do that within Stream.resource/3 I’m afraid it would make the code a little messy and less maintainable.

GenServer, preload all items

I could implement a GenServer that would fetch all items (handling network issues, politeness, etc.) and either return an error (e.g. the remote server is unresponsive) or the full/partial collection of items for processing.

Although the collection of items will fit in memory, I can’t guarantee that as I have no control over the number of items in a collection. Therefore, I’d rather be able to process the batches of items as they arrive to avoid having to worry about memory constraints, especially if running several collection processing processes in parallel.

GenServer, continuation-passing style

I don’t really have any experience with CPS (a better illustration in my opinion is in Fred’s Erlang book, search for " Continuation-Passing Style").

Basically, I could have a function in the GenServer that would return:

  • a batch of items and a “next” information (i.e. information on getting the next batch of items, which the GenServer would understand: it could be a url, limit/offset combo, etc.) in the happy path case. After processing the batch of items, the caller would again call the GenServer passing it the “next” value to continue where it left off. If no “next” information is returned, the caller would know it’s arrived at the end of the collection and there are no more items to fetch.
  • an error tuple if something happened (circuit breaker tripped, etc.)

I would probably implement this in an async fashion where the batch request returns immediately with a reference, and the actual items in the batch are sent later to the caller along with the reference. I.e. the call would get and process the items within a receive block.

It feels like this may be the best alternative, but historically all of my dumb ideas started out looking like good ones ¯\_(ツ)_/¯ so I’d rather have opinions from more experience OTP wranglers before setting down the wrong path.

Opinions?

I’m certain this has all been done before in OTP systems, but I couldn’t find any info on the various approaches, pros/cons etc. If anybody can point me in the direction of blog posts, source code, theory, etc. that would be great.

And of course, please criticize the above approaches and suggest any others you think may be more suitable.

3 Likes

Seems like it might be a good fit for GenStage.

A Producer stage can fetch from the external APIs, buffering results in a DB table.

Consumers can then send demand for items, which will be served from the DB until the buffer is drained and more items are fetched from the API.

1 Like

I’ve toyed with GenStage (more specifically, Flow) and while it’s a great piece of software (which I plan to use in another part of my project), I think it’s not a great fit here because:

  • the collections will typically have fewer than 2000 items, so they should be fetched relatively quickly. This means that the GenStage needs to request more work from some manager process (assuming I use 1 GenStage per source), or I have to handle stopping the GenStage when the collection has been retrieved, and starting a new producer stage (and changing the consumer’s subscription to use the new producer).

  • I haven’t understood how to handle/surface errors in the pipepline without bringing the whole thing down. Granted, this is probably more of an issue of my knowledge/skills, but when experimenting with Flow I was under the impressing that returning an error from the producer would bring the whole pipleine down without letting the items in transit get drained through the pipeline.

The idea of having a consumer to process all collection items that subscribes to multiple producer stages is really nice. But given the nature of the problem (many smallish, paginated collections), I’m somewhat concerned about the overhead of managing the pipeline (i.e. “updating” the pipeline as collections are done being retrieved).

As far as I can tell, GenStage/Flow/Broadway would fit better when you have a large collection of items to process (e.g. clean all records in a DB) where you set up the pipeline and when it terminates successfully the work is done.

Am I misguided? Are there example out there on how to manage GenStage/Flow/Broadway when wanting to process many collections that are on the small side?

1 Like

By your descriptions I feel like you’re trying to solve everything at once. It might be worth trying to think about things more separately. E.g. spliting your business domain up into “querying external sources”, “processing data (of those sources)” and “detecting and handling problems”. E.g. the first part when using GenStage should only concern the producer. No consumer needs to be aware where data is coming from or how it was batched on retrieval. The second and third part would probably be more on the consumers side, where you need to decide how to handle the one failing item, which was produced, but also what should happen to other (following) items in case of errors.

Also I don’t really expect starting GenStages on demand (per source/trigger/collection) to be a problem.

1 Like

If a part of remote collection retrieval fails, do you want to fail the entire collection gathering effort, or is it okay to partially process whatever has been accumulated so far?

1 Like

I’m indeed trying to split out the concerns: I mentioned the processing for (useless?) context.

While there can definitely be issues when processing the collection, I’m not worried about those. I’m more concerned about the issues that come up when retrieving the data (i.e. the producer part of GenStage): what happens if the remote server goes down when the producer is attempting to fetch the next batch of an incomplete solution?

I haven’t found much on how to handle this (i.e. failures in the middle of item production) within GenStage. When I tried out Flow, it seemed to me that the producer encountering an issue (e.g. unable to fetch more items due to remote server unavailability, with the producer returning {:stop, reason, new_state}) would brutally kill the entire pipeline (including the items in transit), although this might not be true (e.g. it may have been due to a mistake of mine). Still I haven’t seen any info on how to gracefully handle errors within GenStage pipelines, which combined with the fact that they’re frustrating to debug (when there’s an issue, all the stages start throwing out errors so it’s not immediately evident what the root cause is).

I agree that starting GenStage producers on demand shouldn’t be an issue. But since the collection that is being retrieved would change relatively frequently (because each would have relatively few items), I would have to manage that somehow. Based on your comment, I assume that you would go for starting a new GenStage producer instance for each collection? In that case, would you just start an entire pipeline for each collection? Or would you reuse the consumer stage (in which case I’d need to get the consumer to unsubscribe from the finished producer and have it subscribe to the new producer, right?)?

Would you by any chance have some info on GenStage pipelines that demonstrate error handling within the stages?

1 Like

The processing would be made idempotent, so either would be fine. That said, it would be nice to be able to partially process and resume at the point of error to avoid any unnecessary burden on the remote servers.

Probably because this is mostly a business decision to make and less a technical one. There are ways to handle failure from simply skipping the failing element/collection to throwing an error and aborting everything. This also doesn’t really have anything to do with GenStage as well. A GenStage producer must be able to send events matching the demand received from consumers. How those events are produced is fully up to the developer.

I feel this is the same issue like above. I’d make the consumer only responsible for triggering work and receiving if it succeeded / failed if failure should not terminate the GenStage. The status can be stored with the event for later stages to act upon. You can devide the handling of dataflow vs. actually doing the fragile work.

From the little work I’ve done with GenStage it felt like adding things after the fact simply wasn’t really expected, so I’d go for starting a whole pipeline per unit of work you receive.

For clean restarts you can also look at broadway, which afaik implemented a manager process to restart a pipeline in the proper order.

1 Like

Great timing! Just today I deployed an upgrade to our system like this to better report errors. I’ve got a module named Unpager that does a Stream.unfold around our home-grown api client abstraction. The Unpager grabs the first page of results and creates a Task to pull down the subsequent page. After exhausting the first page, it awaits the Task and starts one for the next subsequent page. Our api client code uses outgoing throttling to ensure we meet the third party’s requirements – it’s not a responsibility of the Unpager.

Originally it logged when some page request had an error and ended the stream cleanly. Unfortunately the logged info lacked context and we tended to have failures we didn’t notice. Today’s change made it return one more item, the error tuple, when an error occurs. This allows the client call sites to decide whether to pattern match on the error tuple or not for maximum flexibility.

3 Likes

@gregvaughn Thanks for the input! That’s indeed similar to one of my approaches, but I’d like to have the software be able to react to errors arising during item production (e.g. wait for remote server to come back online and try again), and I feel the stream semantics aren’t a great fit. In other words, I’d like to be able to respond to the errors that arise, not just capture them for later analysis.

@LostKobrakai Thanks for taking the time to respond. What I didn’t get, was how to handle errors in the producer stage and shut down the pipeline only once the in-flight items had been processed. After stepping away and searching some more, I realized my thinking was wrong: in the end, I was looking to use the GenStage producer’s exit as a way to control flow which isn’t “the OTP way ™”.
Instead, I should have some sort of “manager” process where the GenStage can report errors before exiting with the {:stop, :normal, ...} tuple to let the in-flight items finish going through the pipeline.
This is the part that I was having trouble grokking: although I was to looking to handle “an error came up when attempting to produce items”, I still wanted to shut the stage down normally (and possibly start again later, etc.).

Also, while I initially didn’t even consider GenStage as a fitting option due to the tiny size of each collection (between hundreds and 2000 items with no real need for back pressure, since the http request with politeness delays will be orders of magnitude slower than persisting in the DB), the composability is going to be nice to have down the road. I guess in my mind the GenStage use case was limited to “have lots of data and require concurrency”, but I figure the pipeline composability are also a valid reason to use them.

1 Like

Yeah. Crashes are meant to handle unknown/one-off type of bugs, not issues you’re aware of upfront. For the latter it’s to be decided on how to deal with them.

Back pressure is also a nice thing to have for everything, which might result is extensive resource usage.

We do something similar and the collection size is also way too big to consider an in memory solution. This happens in the background every couple dozen hours.

Here are few points as to how we approach it:

  • The basic logic is similar to what you describe under continuation-passing style, it is done in a GenServer that is a bit state-machine like (without using fsm) - there is startup routine, crawling, finalizing and waiting states

  • We have a special “cache” DB Table to persist data while it is gathered one page at a time

  • After the crawling is done we “activate” the gathered data by streaming it from the “cache” table to the “live” one (using Repo.stream in transaction), this way the rest of the system always accesses data that was successfully gathered and we never end up with inconsistent state in the live table, any problems would be contained with the “cache” part

  • No circuit breaker but fixed retrying after 5 seconds on failed requests, this part is fully flexible though

  • We have extra status that we persist to ETS table - “waiting”, “crawling”, “retrying”, “finalizing” etc, we also might hold the current error if there is one and the number of page if crawling is in progress, this way UI layer has access to info as to what is happening at the moment

Hey there Greg!

Is this still a solution you’d reach to for paginated API requests; specifically, APIs with rather restrictive rate limits?

Thanks sir!

It’s been a couple of years since I worked on that system, but, yes, the overall approach held up well. The Stream.unfold approach was mostly a convenience to the caller. The idea of using a Task to get one page ahead was more of an optimization in our case. I’d recommend you first get a synchronous Stream.unfold working before adding in the optimization.