I have a stream where I construct the args to be sent to a bulk_create call that will create these resources in my DB in bulk.
The part where I construct the args is ran in parallel, so I would like to put as much work on that part as possible.
Because of that, I looked into the assume_casted? option that bulk_create accepts, meaning that I can cast the args in my parallel code before passing them to the bulk_create call possibly saving time.
What I don’t get is how to cast these values, I can’t find any documentation or code example doing that.
What I assumed casting means what creating the changeset myself with Ash.Changeset.for_create call, or even calling Ash.Changeset.apply_attributes, but these don’t work, so they are probably not it.
I also tried just passing the arguments with the assume_casted? flag on, but then it failed in my union field with no function clause matching in Ash.Type.Union.dump_to_native/2
So, how does one manually cast the args in this case?
Hmm…yeah it’s an interesting point. The idea was to eventually support taking a stream of change sets instead of a stream of inputs, but that didn’t quite pan out. What you’d want to do is cast the inputs to their appropriate types using Ash.Type.cast_input. But we should probably create a utility for doing this
Oh interesting, having a stream of changesets would be great in my case since I would be able to do a bunch of the “heavy lifting” validations on my end instead of during bulk_create.
Any technical reason this didn’t exists? Or is it just that other stuff are a higher priority?
The main reason is that there are some things that must be the same across all changesets that we enforce when we build the change sets ourselves. I think it could be done, but it would be a change with likely a fair bit of back and forth, not something I have time for but happy to guide its creation
Is there something else that bulk_create do that can create some slowdown?
I was testing the assume_casted? option, and, with it off, my call (around 10k rows), will take around 10 seconds, if I enable it, it will go to around 3.5~4 seconds. more than half the time.
I thought the overhead would be around creating the changeset, so I added that part to my core that runs before bulk_create but that was not it.
In the end of the day, I added this code to make sure my resource values are correct so I can call bulkd_create with assume_casted?: true:
Likely it’s the type casting for many records taking a lot of time. We need to add a mechanism to provide inputs that are the same for all rows, which only get casted once, and also likely update the code to cast inputs inside of the tasks we spawn for concurrent batches.
Yeah, I tried to debug it a little bit to see where is the issue, and it seems to be in a Ash.Changeset.handle_params call inside bulk.ex.
I’m wondering, would it be possible to “split” the bulk_* apis into two parts?
For example, in a normal call, you can generate an Changeset, hold it and then just all Ash.create to insert eh changeset.
I wonder if we could something similar, where there is a “intermediary” call that will generate an struct that holds the inputs already casted (say, BulkChangeset or whatever), and thn you can call Ash.bulk_create with it and the only job Ash will do in that part is call the insert commands in the DB.
That way, I could, for example, make sure to call the expensive handle_params call inside my parallel code to make it faster.
Technically the user can still send “trash” that would break the constraints you talked before, but in this case I would argue that’s on them, using the public Ash API would not result in that.
Just as an example of what I tried to do, in line 839 of the ash/actions/create/bulk.ex file, there is an Stream.map(stream, callback), I added parallel_stream dep to Ash and replaced that code with ParallelStream.map(stream, callback).
Now the time went from ~4.5 seconds to ~1.4 seconds.
Of course, I don’t think it makes sense to add that code to the main Ash code, that was just an experiment, but at least it proves that, if that part was “split” from the main bulk_insert call, I would be able to make it way faster in my case. Making the bulk calls more interesting since right now they do lack a little bit in the performance department compared to other languages solutions
In general they are slower because they are doing more than bulk calls in other frameworks. We can definitely push towards better performance, but we wouldn’t want to opt in to parallel stream like that, we’d just want to include the initial type casting in the already built in parallelization logic that we have.
In production, basically I have a system that will fetch new data from a source, process it and insert into the DB, I just need to make it as fast as possible
Hmm…so you can definitely make it faster in general via the assume_casted option, you just have to do the leg work of casting each input.
Do you have config :ash, disable_async?: true in production perchance? Because like I said it parallelizes internally automatically. You could tweak the parameters like max_concurrency and batch_size.
Just to be clear, assume_casted? already gave me a boost in performance, I’m using it right now.
I just found that it is not enough, it still takes a long time during the bulk_insertion part (mostly from that Enum.map call I mentioned before AFAICT.
I didn’t tried changing these two parameters you mentioned, I will look into them
The Stream.map there is mapping over a stream that already does work async. So it’s being done like “doubly async” which may just mean that increasing max concurrency will help you. We are using Task.async_stream further up in that stream.