What tool to choose for concurrent and tracked tasks?

Hi, I am trying to choose a right tool for my situation:
I have a preconfigured collection of urls (about 1000) that should be parsed (data from an url should be downloaded, processed and sent to an external API).

I have 2 features in my app:

  1. Process one specific url
  2. Process the whole collection at once

I am think about a tool for the 2nd feature.
Because of the 1st feature I need to track the proccess of parsing (to forbid a repeated launch and to inform frontend about progess of the parsing). Thats why I can’t use Task module for the 2nd feature because it doesn’t allow to check if a task is still running.
For the first feature I use a Supervisor + GenServers that is being registered via Registry to allow easy tracking.

Start all 1000 urls concurently (like to start 1000 existing GenServers) is not an option because there a lot of 3rd party services that could fail the parsing because of rate limiting and etc. I could refactor the parsing logic to fix all bottlenecks but I would like to find an easier and faster solution.
One of them could be to create a limit on concurrency, e.g. only 10 GenServers working at the same time.
But I don’t know how to implement it except that its looks like a job for GenStage, but it seems an overkill for my situation since I have a finite collection of urls.

I will be glad for any help :slight_smile:

1 Like

Have you checked the Flow lib? At work, we are using Flow to provide CSV processing that notifies connected LiveViews about what’s going on under the hood.

1 Like

Yes, but I didn’t find an option how I can track that the url is processing. It seems in your case processes send some info by themselves, but in my case I need to ask my app (or processes) if a process is still running.

I’m not sure I follow the problem you are having… If you just need to parse data from a URL and prevent users from re-submitting the same URL; you could control that by filling a column in the database to represent the states this URL went through.

Flow and GenStage will help you on the concurrency side, but it seems you need to validate if you can (re)process the data as well…What we do for our case is:

  1. User starts import process
  2. We stream the CSV file into lines
  3. Each line goes into a “Flow process”
  4. Each “Flow process” inserts the processed line into the database
  5. Each “Flow process” broadcasts the inserted line to connected liveviews

If I understand what you are trying to achieve; you could save already processed URLs in the database to prevent users from processing them again (and update LiveView states using PubSub).

2 Likes

You could write a bunch of GenServers and rate limiting and retries and logging and database persistence and and and…

Or you could:

  • install Oban, and set a sensible concurrency for the default queue
  • create a worker that takes a single URL and processes it
  • write a tiny piece of code to enqueue the job for each desired URL
  • there is no step 4, other than possibly adjusting settings for queue retention

The Oban jobs in the database make it easy to answer “what’s running”, as well as automatically handle retrying with configurable “backoff” times for APIs that hit rate limits.

1 Like

@al2o3cr @thiagomajesk thanks for your suggestions :slightly_smiling_face:

I ended up with usage of

Task.Supervisor.async_nolink(
  Supervisor,
  fn -> start_function() end,
  opts
)

where start function is

def start_function(id, ...) do
  Registry.register(MyRegistry, id, nil)
  ... some work ...
end

I just didn’t know the fact that despite Task functions doesn’t have name option it could be nevertheless registered explicitly by Registry.register/3 function