Hello,
I need to write an Elixir function which will evenly distribute unbalanced load over set of tasks (nodes). Lets say I have following input:
number_nodes = 3
weighted_tasks = [10, 10, 10, 7, 6, 5, 5, 2, 2, 1, 1, 1]
node_capacity = 20 # as sum of all tasks/number_nodes
and I need to get this output (one of other possible combinations)
[[10, 10], [10, 7, 2, 1], [6, 5, 5, 2, 1, 1]]
which mean that load weight per node is 20 for all 3 nodes and balanced.
I am trying Enum.chunck_by() function but not able implement it correctly, here is my attempt:
def chuck_f(node_capacity) do
chunk_fun = fn element, acc ->
acc_capacity = Enum.sum(acc)
pending_capacity = acc_capacity + element
if pending_capacity < node_capacity do
# do not yet emit given chuck, but remove this element from the input list as it
# is added to the chuck, but how to do that?
{:cont, [element | acc]}
else
# if no more element in input list present emit chunck
# if there are some element just skip current one and do not emit chunck
# but how to get it?
{:cont, [acc], []}
end
end
chunk_fun
end
def after_iteration_fun() do
after_fun = fn acc ->
case acc do
[] -> {:cont, []}
acc -> {:cont, acc, []}
end
end
after_fun
end
def distribute_load do
input = [10, 10, 10, 7, 6, 5, 5, 2, 2, 1, 1, 1]
node_capacity = 20
input |> Enum.chunk_while([], chuck_f(node_capacity), after_iteration_fun())
end
which producing result:
iex(109)> distribute_load
[[[10]], [[7, 10]], [1, 1, 1, 2, 2, 5, 5]]
which is not what I need.
I have this algorithm implement in Java with this pseudo code:
partitions_list = compute_list_of_partiton_with_load_sort_desc # this is already done in Elixir
cluster_load = compute_total_cluster_load
one_node_load = cluster_load/num_of_nodes
#for all nodes in cluster
for node in node_list {
# initiate node with no partitions yet and full capacity
node.partion_list = []
node.capacity = one_node_load
for partion in partition_list {
if partion.load <= node.capacity {
node.partion_list.add(partition)
recompute_node_capacity(node, partition)
partition_list.remove(partition)
}
}
}
def recompute_node_capacity(node, partitions) do
new_capacit = node.capacity - partition.load
node.capacity = new_capacity # ugly node's state mutation
end
and this actual draft Java impl where I can easily mutate map/list just in place which give me ability to achieve the goal:
@Test
public void balanceLoad() {
List<Integer> tasks = new ArrayList<>();
tasks.add(10);
tasks.add(10);
tasks.add(10);
tasks.add(7);
tasks.add(6);
tasks.add(5);
tasks.add(5);
tasks.add(2);
tasks.add(2);
tasks.add(1);
tasks.add(1);
tasks.add(1);
Map<Integer, List<Integer>> nodes = new HashMap<>();
nodes.put(1, new ArrayList<>());
nodes.put(2, new ArrayList<>());
nodes.put(3, new ArrayList<>());
System.out.println("Before: " + nodes);
for (int nodeKey : nodes.keySet()) {
List<Integer> nodeList = new ArrayList<>(tasks);
List<Integer> nodeTasks = nodes.get(nodeKey);
for (Iterator<Integer> iterator = nodeList.iterator(); iterator.hasNext();) {
Integer task = iterator.next();
if(task <= nodeCapacity(nodeTasks, 20)) {
nodeTasks.add(task);
iterator.remove();
tasks.remove(task);
}
}
}
System.out.println("After: " + nodes);
}
private int nodeCapacity(List<Integer> nodeTasks, int nodeCapCapacity) {
int current = nodeTasks.stream().reduce(0, Integer::sum);
return nodeCapCapacity - current;
}
giving me this result:
Before: {1=[], 2=[], 3=[]}
After: {1=[10, 10], 2=[10, 7, 2, 1], 3=[6, 5, 5, 2, 1, 1]}
Can anybody assist to achieve similar in Elixir? Thank you for your time.