What I’m struggling with
I want to process a collection of items that are remote. These items aren’t all available in a single request: to get all items, multiple requests will have to be made (e.g. via a paginated api). Further, there are several of these collections I’d like to retrieve and process, and they have varying sizes. The collections of items would be retrieved from various sources, such as paginated APIs or scraping. Although each collection of items would be small enough to fit in memory at any one time, I’d ideally like to avoid this in order to prevent issues down the road.
Conceptually, I’d like to stream the collection of items and process them one at a time (where by “processing” I currently mean persisting them to the DB, although some response may be deserialized beforehand).
Approaches I’ve considered
Stream.resource/3
It would be really nice to stream the collections via Stream.resource/3
(example here), but I don’t see how to nicely handle the non-happy path:
-
network errors: having retries is obviously trivial, but in case the server is unresponsive (e.g. circuit breaker is open), the error needs to be reported to the caller. I considered implementing this via throw/raise, but that means that every time I want to iterate over a collection, the code must be surrounded by try/catch/rescue, which doesn’t seem very ergonomic.
-
politeness: whether scraping or interacting with an API, I can’t just slam the remote server with requests as fast as I can process the data. Delays between requests must be inserted, etc. Here also, while it would be possible to do that within
Stream.resource/3
I’m afraid it would make the code a little messy and less maintainable.
GenServer, preload all items
I could implement a GenServer that would fetch all items (handling network issues, politeness, etc.) and either return an error (e.g. the remote server is unresponsive) or the full/partial collection of items for processing.
Although the collection of items will fit in memory, I can’t guarantee that as I have no control over the number of items in a collection. Therefore, I’d rather be able to process the batches of items as they arrive to avoid having to worry about memory constraints, especially if running several collection processing processes in parallel.
GenServer, continuation-passing style
I don’t really have any experience with CPS (a better illustration in my opinion is in Fred’s Erlang book, search for " Continuation-Passing Style").
Basically, I could have a function in the GenServer that would return:
- a batch of items and a “next” information (i.e. information on getting the next batch of items, which the GenServer would understand: it could be a url, limit/offset combo, etc.) in the happy path case. After processing the batch of items, the caller would again call the GenServer passing it the “next” value to continue where it left off. If no “next” information is returned, the caller would know it’s arrived at the end of the collection and there are no more items to fetch.
- an error tuple if something happened (circuit breaker tripped, etc.)
I would probably implement this in an async fashion where the batch request returns immediately with a reference, and the actual items in the batch are sent later to the caller along with the reference. I.e. the call would get and process the items within a receive
block.
It feels like this may be the best alternative, but historically all of my dumb ideas started out looking like good ones ¯\_(ツ)_/¯ so I’d rather have opinions from more experience OTP wranglers before setting down the wrong path.
Opinions?
I’m certain this has all been done before in OTP systems, but I couldn’t find any info on the various approaches, pros/cons etc. If anybody can point me in the direction of blog posts, source code, theory, etc. that would be great.
And of course, please criticize the above approaches and suggest any others you think may be more suitable.