Batch processing w/ retry: use GenServer with spawn_monitor instead of Task.Supervisor?

Hi,

I’m writing some sort of data-processing in a project, and I suspect I’m headed in the wrong direction with my design: I’m hoping the community can point me in the right direction and right my design wrongs before it’s too late :blush:

Essentially, I have a group of elements, and I want to process them in batches. For the sake of an example, let’s say I have lots of companies, with each company having a collection of employee records. The processing for each company is triggered (by an Oban job, but that shouldn’t be relevant) and all employee records for that company should be processed.

My idea is to process the records in batches with a fixed size. The reason for this is that the number of employees per company can vary “significantly”: some may have only a few hundred, while others will have tens of thousands. By having loading the employees in batches, the resource consumption should be more linear, and I should be able to increase concurrency by increasing the number of companies processed simultaneously (please correct me if my reasoning is wrong). Without a batching approach, increasing concurrency in this manner would be hit or miss: loading a company with many employees could suddenly require too many resources.

To achieve this, I was planning on having a GenServer manage the processing of all employee records for a given company: it would be started with a company id, and then start its own Task.Supervisor to farm out the processing of each employee record via async_nolink/3. As the tasks can fail (e.g. due to DB-related reasons), the GenServer would match on messages from the Task.Supervisor (as suggested here) and keep track of the employee records that were processed successfully or failed. The failed ones would be retried (if the error was transient), or logged and dropped. Then, the next batch of employee records would be fetched, and go through the logic above. Once all employee records for the company had been processed, the GenServer would stop: the calling process would be waiting for the GenServer’s result with a receive block and respond appropriately (mark job as success/failure).

What makes me suspect that this isn’t the proper approach is that Task.Supervisor cannot be stopped (i.e. it has no stop function documented, as opposed to e.g. GenServer). Presumably, this is because instances of Task.Supervisor are meant to be started in the supervision tree and just keep going as long as the OTP app in alive (like other “utility” processes like Registry).

How then should I rethink my approach? There appears to be no equivalent to Task.Supervisor's async_nolink/3 within the Task module. Should I have my GenServer trap exits and use Task.async/1 to reproduce the above approach (i.e. matching task results in the GenServer’s handle_info)? Should I spawn_monitor/1? Given I plan to have the GenServer handle results in its handle_info callback, would there be any advantage to use a Task at all over spawn_monitor? Is my approach entirely wrong?

I guess Task.Supervisor is the wrong tool for what I’m trying to accomplish: although I want failed tasks to be restarted, I need more control than Task.Supervisor afford me… So let me know how best to approach this: I’m currently leaning towards a spawn_monitor approach, but I’d appreciate any insight you may offer.

It sounds like your scenario might be a good fit for GenStage or Flow, but I don’t have enough (acutally any) experience with them to comment in details, so hopefully someone else will chime in.

The GenServer/Task dance you’re explaining in the second part could be implemented with my Parent.GenServer. Basically, Parent.GenServer is a GenServer with some supervision capabilities, allowing you to start direct children and react to their termination in the handle_child_terminated/5. You’d have to implement restarting/giving up logic manually, but it shouldn’t be particularly hard.

With that in place, you could then stop the parent GenServer via GenServer.stop (or by asking the parent supervisor to shut the child down), and it would perform proper synchronous termination of child processes, stopping only after all the children have been taken down.

Have you considered to use library which has you problem solved like broadway?

If you want to implement it yourself, I would consider following approach:

Implement a module for employee processing, such as CalculateSalaries with a function like call(company_id).

This function fill fetch all data necessary for calculation and then process each record. The idea is based on assumption that we know which records we need to process.

The next step depends on amount of computation we need to make and do we need to make external calls. If time required to process each record relatively high then I would acquire lock on the table, take next 20 records and mark them as “in progress at” in a database with a timestamp.

After processing each record we’ll reset in_progress_at field and we’ll write the result of a computation. We’ll process only such a small amount of records and thats it. For the next call if we have nothing to process we’ll return {:ok, :empty} as an example.

Because of that approach we’ll have an ability to parallelise processing as much as we want.

The next step is to define job workers which can process queued companies. Each worker is a gen server and calls itself on predefined interval.

When message arrives, gen server will ask for a new jobs from Agent. After processing the job it’ll remove it from agent if the job responds with {:ok, :empty}.

Here is just a basic template based on description you’ve provided. It abstracts processing from business logic, i.e. it doesn’t know anything about companies and just executes the jobs from the queue. You can use persistent state instead of agents if you want to make your system survive reloads.

It all really depends on the problem you have and with a scarce information about domain it’s hard to tell what is most relevant approach for your issue.

With approach you’ve described above I can see the issue with separate gen server for each company processing. You said we want to control how much jobs are executed in parallel but in such case if we want to process thousand companies we need to implement another layer to watch for all companies we want to process and how many processing right now. Because of that I’ve separated queue processing implementation to hold as little persistent state as possible.

If you have any questions or ideas I’ll be happy to answer.

1 Like

Thanks for helping me crystalize my thinking via your answers. I’ve thought about this more these past days, and I think what it fundamentally boils down to is this:

How can I manage the concurrency of data-processing tasks so they won’t slow down “more important” processes?

I know the BEAM will keep everything reactive, but that could still mean that an important activity is being slowed down by a lesser activity because the lesser one has a concurrency level that is too high (and therefore too many system resources are spent on it).

As I’m still getting the hang of Elixir, I’m not quite sure how to determine and set the “right” level of concurrency in various parts of the system (if you have any resources to learn more about this, please share!). And even if I were to have the “perfect” level of concurrency, things change and that level may have to be adapted.

I think that’s why I shied away from GenStage-based approaches, as changing their level of concurrency isn’t very straightforward as far as I can tell (you have to essentially add/remove subscribers to/from various stages as well as handling draining).

This is what brought me to the conclusion of “if I have a GenServer processing each element in a task, I can easily change the level of concurrency by sending a message and adapting the number of concurrent tasks the GenServer will allow to be spawned.”

@sasajuric Thanks for bringing Parent.GenServer back to my attention: I was aware of it, but in my mind, its intended use was mainly to ease the management of worker pools. But you’re right: if I go down the GenServer/Task route, Parent.GenServer will be really handy to reduce boilerplate.


Here’s a better description of my problem context transposed to a generic example (I’m trying to avoid having too much domain-specific info to avoid muddying the waters):

I’m working on an ETL project (or more appropriately an ELT project) and let’s say I want to know what various cars are worth on the second-hand market. I have a GenStage pipeline (mainly for back pressure) gathering data from various sources (ebay, garage web sites, etc.) and storing it in a database in a raw form.

Once this data has been retrieved, information has to be extracted from the description so it’s structured and actually usable (e.g. “F-150 raptor”, “F 150 truck”, “Ford F150” all refer to the same vehicle model). But in addition to the first pass where information is extracted from the given listing, later extraction passes may also be triggered manually (e.g. I’ve added code to extract the vehicle’s color, so now I want to run the updated extraction code on all records). This is what I’m stuck on.

My initial idea is very similar to what you’re suggesting, @achempion. The data is initially ingested in discrete “groups” (e.g. a single auction with many car sales, a single car dealer’s website, etc.). Once a group of raw records has been obtained, I persist an Oban job for the information extraction step. The concurrency of this step can easily be changed via https://hexdocs.pm/oban/Oban.html#scale_queue/3. Then, when processing each “group” of records, I was planning on having a GenServer spawn 50 (e.g.) tasks to process the first 50 records in the group, and so on until all of the records for the given group were processed.


@achempion I don’t think I want to use Broadway, as I don’t believe I need the robust job acknowledgement intended (I think?) to consume data from external sources (i.e. don’t remove an item from the SQS Queue until it has been successfully ack’ed by Broadway).

@sasajuric You may be right about using GenStage/Flow: I could have a GenStage producer emitting batches of unprocessed records (i.e. ones that have just been ingested), and then use Flow to do the extraction processing in parallel.
This approach would also allow reusing the code for the “bulk extraction” execution (where I want to run new extraction code on all records). In addition, this may mean that Oban jobs may not be necessary.
However, I don’t see a way to adapt concurrency in Flow (or GenStage, aside from adding/removing consumers as alluded to above) after it has been configured. I wouldn’t want the info extraction processes to slow down the data ingestion because they’ve been allotted too many resources.

Any thoughts on the (very long wall of text, I’m sorry!) above? Is my worrying about dynamically managing the concurrency levels of different parts of the system misplaced? If not, what are your thoughts on the various approaches described above?

1 Like

This sounds very similar to Task.async_stream/3,5. Have you looked at that (or the similar functions in Task.Supervisor)? Although this only allows you to control the concurrency within one “job”, if you want to control the number of jobs as well you might want to use a worker pool library like poolboy. With a worker pool you can know how much resources (i.e. concurrency) is dedicated to the specific task that worker pool is dedicated to.

1 Like

Parent’s intended use covers various scenarios. Check out the rationale for details. If you want to dynamically manage child processes, Parent might likely be helpful.

My misinformed intuitive take is that you are overthinking it for one simple reason:

Data gathering is going to be network I/O bound and the data ingestion is going to be CPU-bound (and sometimes disk I/O bound).

Even if you didn’t go with Elixir the chances of having too much stuff running that impedes each other are very slim.

As @axelson pointed out, I’d bet most of your workflow can be served just fine with several groups of Task.async_stream calls. If things seem unstable, go with Sasa’s library.