Fetching tens of thousands of files regularly

Long time lurker here :wave:

Some background: I’ve been learning and playing with Elixir for about a year now. I did a few simpler projects with Phoenix, but now I wish to try something a bit harder.

As the title suggests one of the parts of the system I’m trying to build is going to fetch thousands of files regularly, perform some form of processing and then save any relevant information.

I think I have most of the parts figured out. There are plenty of good resources of doing APIs + DB and making that handle lots of requests from the outside.

What is the best way to handle this tho when the application itself generates that amount of work? For doing the processing I’m looking at Rust NIF, which looks promising & I’m curious to play around with it. My question is more around how to handle the logic around a large (and let’s imagine increasing) volume of work. I’ve looked at GenStage, but I’m not sure I understand enough to imagine how it would work. What would be the tradeoffs in Elixir between getting the work done fast making sure the app doesn’t crash itself.

Another thing I’m trying to wrap my head around is: How to write this without limiting the application in the way that it’s written. If a worker pool is used that might work, but what if the app is hosted on GCP or AWS and the machine can be scaled vertically, the app wouldn’t scale without changing the code?

How can I exploit the concurrency here optimally? How does one determine the boundaries in such cases and then make sure they are respected?

I’m not sure I’m explaining well enough here, that’s probably due to my level of understanding =)

1 Like

A few questions: How regulary are they fetched? How large are the files? Where are they fetched from? How long does it take to process one of these files? :slight_smile:

If we know more about the problem it’ll be easier to make recommendations

2 Likes

take a look at https://github.com/dashbitco/broadway

Broadway takes the burden of defining concurrent GenStage topologies and provide a simple configuration API that automatically defines concurrent producers, concurrent processing, batch handling, and more, leading to both time and cost efficient ingestion and processing of data.

that would be the OTP/Beam route where you get your hands a little dirty and learn/understand a lot…

you can also take a look at https://github.com/sorentwo/oban that would be an easy and fast solution - though you won’t get your hands dirty as it “just works” and most if not all things are solved for you out of the box…

personally I would go the Oban route - but all depends - if you need massive scale I suppose Broadway might be better/more efficient… and if want to learn how to build it out and understand all aspects…

but as @lpil said - give us some more info pls…

General advice: stay away from NIFs for as long as humanly possible.

Even if your processing code ends up being slow and inefficient, stick with it for a while. Don’t introduce much more complexity until you’ve shaped a working solution.

I’d recommend GenStage and Broadway generally but in order to stick to simplicity I’d much more readily recommend you just look at the docs of Task.async_stream (3-args and 5-args).

5 Likes

I highly second everything in @dimitarvp reply. I would look at your data pipeline needs in terms of simplest to more complex. Task -> GenStage -> Broadway -> Queue backed (ExQ, Oban).

One thing I’d generally recommend is having control over your concurrency so that you don’t overwhelm the system. For example, processing 25 files at once rather than letting 1000 go as fast as possible. With reading files (assuming from disk), then you could probably do a fair bit at the same time.

1 Like

Very good point! I was thinking of adding the details in the initial post, but was wondering if it would be too much :slight_smile:

For this I’m thinking idea I’m thinking the following (at least first stab at it):

  • text files
  • 0-10MB (I can limit the size and just not fetch if bigger than X)
  • fetched hourly
  • fetched from an HTTP server
  • processing: Thinking of diffing the files against previous fetches, I expect time in ms, growing linearly with file size. It’s why I was thinking Rust.

Í looked at Broadway a while back but was left with the impression it’s used with external Queues. I probably got confused because of the context of the blog posts that I saw floating about. I’ll take a better look, would definitely go for the learning route.

I agree here with the point on simplicity and the approach to the problem. From my Googling efforts, I was left with the impression as you say to stay away from NIFs. Given that my goal is to process the largest number of files in that 1-hour window and there is CPU processing on the files that could be substantial - should I be doing it in Elixir at all? Am I using the right tool for the job?

I agree that I should probably do it all in Elixir first throw 100…10k files at it and see how it does (for a start).

Might add some other form of processing on the files once this initial pass is done. I’d love to do it in Elixir, but I’m not sure if it’s the right tool for the job. I could always increase the processing power, but that’s nowhere near as fun as doing more with less :wink:

I was thinking of something similar regarding limiting the concurrency. How does one come up with the magic number - n*cpu_count ?


It’s worth mentioning that I came up with all the rules for this (1 hour, 0-10MB, etc.) =) I’m interested to learn what the optimal and/or the best way would be to go about doing this is. Given those limitations how can I make this process the largest amount of files - and learn more Elixir along the way!

You are likely to need to resort to Rust NIFs eventually, sure. My point was: do it 100% in Elixir to arrive at an actual working prototype first. Adding much more moving parts (namely a NIF) is going to increase your initial time to an MVP. I can’t possibly argue for/against NIFs because I seriously have no idea what exactly is your processing going to do. :slight_smile:

If it is any comfort: nobody ever figured that out yet. :003:

However, with the BEAM VM (Erlang/Elixir) you have the luxury not to think about it unless you really have to (or, like you and me, want to get the best possible performance per watt). You can just spawn 50_000 separate tasks and you can be very sure that the runtime will fairly parallelise them with the minimum possible lag.

That being said, using Task.async_stream without specifying the concurrency defaults to spawning tasks no more than the number of the CPU threads (which are usually 2x the CPU cores) – it assumes the loads are CPU-bound. You do have control though. I’ve successfully increased concurrency all the way to 1000+ tasks for a workload where each task was reading small files from a very fast SSD and then doing a network request to another service (controlled by the same company and a very powerful service at that which didn’t flinch at 1000+ requests/sec). I/O bound workloads can tolerate a looooot of concurrency.

That sounds like not much data in an hour! Perhaps try the simplest and easiest implementation and see how that works. Best not to over engineer when it might not be required.

4 Likes

Yep, sounds like the reasonable way to go about it.

First I want to try my hand at diffing the files with the previously saved versions. So text operations - up to 10MB of text. I remember reading that Elixir isn’t the most efficient when handling strings (happy to be wrong), thus my line of thinking. Also, I’ve not yet found a diffing library in Elixir, so I’ll need to learn to write some very performant Elixir :grimacing:

That was my line of reasoning initially - Elixir to make the requests and fetch the data and Elixir or something else to do the processing.

There probably needs to be a limit based available memory, there can be only so many files processed at one time before going OOM :smiley:

That sounds great!

My initial concern was that I might be trying to solve a problem with Elixir that I shouldn’t be. Thanks to the answers here in the thread I see that there are lots of gains to be had before needing anything else.

Quite happy to try and keep the complexity at bay.

Can you give us an idea of the types of diffs you are looking for (e.g. just new lines in the newer files? Are they all going to be at the end, or randomly scattered throughout?) and what you will be doing with them? We may be able to help you with an efficient approach.

+1 to the recommendation for Oban.

Re: the NIF angle, also consider if ports would be a good fit. You could write a separate Rust program that talks over a port and does the crunching. There’s some overhead - since data is being transferred between operating-system processes - but OTOH if the program on the other end of the port fails it doesn’t affect the BEAM. The NIF approach isn’t free of performance issues related to serialization either.

2 Likes

File changes can be in any part of the file.

In terms of diffing additions & removals. Found this https://stackoverflow.com/questions/805626/diff-algorithm.

What you need to know? What have changed or rather if there was any change at all?

Hash the whole file: if hash hasn’t changed no need to diff.

Show any changes if there are any. Found this as mentioned above: https://stackoverflow.com/questions/805626/diff-algorithm.

There are good built-in and external crypto libraries that already have NIFs covering much more then hashing so at least on that front you’re covered very well.

Could you point me in the direction of some of those? I’m curious to see what the different options look like.

Erlang’s crypto package is a very good start.

Summoning @voltone for additional options, if he doesn’t mind.

3 Likes

Calculating a digest over a file’s content, without reading the while file into memory at once, is easy enough with :crypto, though it is not going to be as fast as a native C/Rust implementation with a tight loop over a writable buffer.

hash_algorithm = :sha   # For example

digest =
  File.stream!(filename, [], 2048)
  |> Enum.reduce(:crypto.hash_init(hash_algorithm), &:crypto.hash_update(&2, &1))
  |> :crypto.hash_final()
2 Likes

Do we have performance benchmarks of sha2 vs sha1? IIIRC command line sha2 is faster than sha1 on all platforms and faster than md5 even on all platforms except Intel broadwell and amd pre-opteron. I think that should be true too for BEAM depending on which openssl library is used.

Edit:. I couldn’t find the benchmarks I thought I remembered, point being, profile first before picking hash.