How to split data into equal parts properly to process them evenly?

Let’s say:

  • I have 1 millions of records in a DB and I want to process them daily via something like Oban.
  • there’s an Oban job which is executed hourly
  • an Oban job is able to process around 70k-200k hourly
  • it can’t be known in advance how many it’ll process each tiime

The key here is to process the records evenly, that is, simply running one job per 24 hours to have it process all the 1 mln of records at once, one by one, won’t do for me.

How would I split the records into parts and also memorize how many of them have been processed so far, daily? Meaning, in an easy manner.

I could create a DB table with a counter → id, date_time, counter
But it appears to be an overkill.

Won’t there be a better way?

Then why not using something similar, but in memory like :ets table?

1 Like

Great question! The basic principle we’re gonna use here is remainder math. At a basic level, you could spawn two jobs, one of which handled records that had even numbered ids, and another job that handled odd numbered ids. If you want to do more than just two jobs you can extend this concept by using the modulus operator %.

If you want to do this on a cron, my recommendation is to basically do a two phase job enqueuing process.

Phase 1: A regular Oban job that is configured to run hourly. That job spawns say 10 jobs to do the actual work, and it gives each job a number between 1 and 10 in the args, as well as the total number (for easier config)

for i <- 1..total do
PartitionJob.new(%{partition: i, total: total) |> Oban.insert!
end

Phase 2: The partition job runs, and it takes the arg given to it and queries your table of records.

SomeSchema
|> where([t], fragment("? % ? = ?", t.id, ^args["total"], ^args["partition"]))
|> Repo.all

And there you go! If a job is given say partition 7 then it will get all rows where schema.id % 10 = 7 which will be roughly 1 10th of the rows.

The main upside to this approach is that you have a ton of control over the transactional characteristics because all of the relevant info ends up in the DB. It also handles node failure or cluster size changes reliably due to Oban unique jobs.

2 Likes

Is it a good practice? I have no idea about oban, but in worst case it could take even 100% of rows as long as there is possibility to call Repo.delete/1 on SomeSchema and 90% rows are deleted. Of course nobody expects deletion of 90% rows in real case - I wrote that just to show the problem by providing an exaggerated example.

What may be worth to mention is that all rows matching schema.id % 10 = 7 could be in worst case all first or all last rows, so sort call with such code may have an unexpected behaviour.

If possible I would advice to use ROW_NUMBER() function which should solve be better solution for PostgreSQL database. Regardless of how your data looks, how much is deleted and finally how you sort them it would always work exactly the same way. For more information please take a look at:

  1. Window Functions documentation
  2. PostgreSQL ROW_NUMBER Function tutorial
1 Like

I’d probably go for a single Oban job that fetches records via Repo.stream and then multiplex the record batches to other Oban jobs or just use good old Task.async_stream but I admit I am not aware of the possible complexities or gotchas of using sub-workers and such.

1 Like

ROW_NUMBER() isn’t stable across different queries though. If a single row is deleted between process A doing the query and process B doing the query then the whole count is off and it won’t partition properly. The value of the modulus approach is that you’re using a value intrinsic to the row, which ensures that all parties can agree on which partition it belongs to just by looking at it knowing the partition counts.

I’m not really sure I follow your argument with respect to the modulus logic. It doesn’t really have an issue with deleted rows, unless for some reason all rows with a specific modulus value were specifically deleted, which seems hard to imagine in a regular use case. If you’re using auto incrementing IDs, then the probability distribution of modulus values is equal. So unless there is a bias in deletes towards specific modulus results I don’t really see what you’re saying.

Concrete example:

iex(11)> ids = 1..1_000_000
1..1000000
iex(12)> ids |> Enum.map(fn i -> i |> rem(10) end) |> Enum.frequencies
%{
  0 => 100000,
  1 => 100000,
  2 => 100000,
  3 => 100000,
  4 => 100000,
  5 => 100000,
  6 => 100000,
  7 => 100000,
  8 => 100000,
  9 => 100000
}

We generate a million auto incrementing keys, partition them modulo 10, and we see how many go to each partition. As expected it’s perfectly even. Let’s delete the first 90%:

iex(13)> ids |> Enum.map(fn i -> i |> rem(10) end) |> Enum.drop(900_000) |> Enum.frequencies
%{
  0 => 10000,
  1 => 10000,
  2 => 10000,
  3 => 10000,
  4 => 10000,
  5 => 10000,
  6 => 10000,
  7 => 10000,
  8 => 10000,
  9 => 10000
}

Still even, as expected. Let’s delete a random 90%:

iex(15)> ids |> Enum.map(fn i -> i |> rem(10) end) |> Enum.filter(fn _ -> :rand.uniform(10) == 1 end) |> Enum.frequencies
%{
  0 => 10007,
  1 => 9958,
  2 => 9975,
  3 => 9888,
  4 => 9937,
  5 => 10032,
  6 => 10153,
  7 => 10072,
  8 => 10029,
  9 => 9854
}

Still basically even. To get a non even result you have to have some sort of delete pattern that is more common in rows with ids ending with 9 than rows ending with 7 and that just seems very rare to me, and is in any case easy to check in your own dataset to see if it’s happening.

1 Like

Oh, did I missed that information somewhere? Could you please share more about it? It would be a good resource to learn. :+1:

From the link you provided:

The ROW_NUMBER() function is a window function that assigns a sequential integer to each row in a result set.

The key words there are result set. It’s the row_number within the set of things returned by a given query. Suppose there are 5 total rows with ids [1,2,3,4,5]. If Pid 1 queries all rows, sorts by id, and returns both the ID and row number it’s basically like doing:

iex(17)> [1,2,3,4,5] |> Enum.with_index(1)
[{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}]

If ID number 2 is deleted between when the first pid runs this query, and the second pid runs this query, pid 2 will get a totally different row number for ids 3,4,5

iex(18)> [1,2,3,4,5] |> Enum.reject(fn i -> i == 2 end) |> Enum.with_index(1)
[{1, 1}, {3, 2}, {4, 3}, {5, 4}]
1 Like

Ah, right - my bad. I was focused on count of rows in each group and therefore suggested a solution where said count is not different by more than one comparing to count in other groups. That part was good standalone, however I completely forgot that in such case we cannot accept changing count of rows in each group when one or more rows were deleted, because some rows who changed groups may not be processed at all. :sweat_smile:

1 Like

Yes, it’s one of the solutions.

Ok. That would work.

However, what if

  • a) ID-s were not more or less sequential? So much so that that the ratio odd/even was 60%…40%? Or even 70%…30%?

  • b) ID-s weren’t integers but GUID-s?

b) ID-s weren’t integers but GUID-s?

You can try uuid_hash if you’ve got UUIDs on Postgres:

select mod(mod(uuid_hash(gen_random_uuid()), 10) + 10, 10);

Note the awkward mod(mod(thing, x) + x, x) form that took. That’s because Postgres’s native mod functionality, annoyingly, can return negative values: sql - Real, mathematical "modulo" operation in Postgres? - Stack Overflow.

1 Like

If you’re using auto incrementing keys then it is more or less sequential. If you’re splitting into N partitions, you would need a delete pattern that favored one specific value of N more than others. As I said in the earlier post it’s trivial to just go query your database to see if that is happening in your case: select id % 10, count(id) from table group by 1.

@hhquark is dead on with the uuid answer. Basically what we’re using % here for is a kind of “consistent hash” which reliably turns a given input value into one of a smaller set of output values. % is a very efficient way of doing this for integers, but there are other consistent hashing functions that work on other values too.