Even load distribution function, how to implement in Elixir?

Hello,

I need to write an Elixir function which will evenly distribute unbalanced load over set of tasks (nodes). Lets say I have following input:

number_nodes = 3
weighted_tasks = [10, 10, 10, 7, 6, 5, 5, 2, 2, 1, 1, 1]
node_capacity = 20 # as sum of all tasks/number_nodes

and I need to get this output (one of other possible combinations)

[[10, 10], [10, 7, 2, 1], [6, 5, 5, 2, 1, 1]]

which mean that load weight per node is 20 for all 3 nodes and balanced.

I am trying Enum.chunck_by() function but not able implement it correctly, here is my attempt:

  def chuck_f(node_capacity) do
    chunk_fun = fn element, acc ->
      acc_capacity = Enum.sum(acc)
      pending_capacity = acc_capacity + element

      if pending_capacity < node_capacity do
        # do not yet emit given chuck, but remove this element from the input list as it
        # is added to the chuck, but how to do that?
        {:cont, [element | acc]}
      else
        # if no more element in input list present emit chunck
        # if there are some element just skip current one and do not emit chunck
        # but how to get it?
        {:cont, [acc], []}
      end
    end
    chunk_fun
  end


  def after_iteration_fun() do
    after_fun = fn acc ->
        case acc do
          [] -> {:cont, []}
          acc -> {:cont, acc, []}
        end
      end
    after_fun
  end

  def distribute_load do
    input = [10, 10, 10, 7, 6, 5, 5, 2, 2, 1, 1, 1]
    node_capacity = 20
    input |> Enum.chunk_while([], chuck_f(node_capacity), after_iteration_fun())
  end

which producing result:

iex(109)> distribute_load
[[[10]], [[7, 10]], [1, 1, 1, 2, 2, 5, 5]]

which is not what I need.

I have this algorithm implement in Java with this pseudo code:

partitions_list = compute_list_of_partiton_with_load_sort_desc # this is already done in Elixir
cluster_load = compute_total_cluster_load
one_node_load = cluster_load/num_of_nodes


#for all nodes in cluster
for node in node_list {
    
    # initiate node with no partitions yet and full capacity
    node.partion_list = []
    node.capacity = one_node_load

    for partion in partition_list {

        if partion.load <= node.capacity {	
            node.partion_list.add(partition)
            recompute_node_capacity(node, partition)
            partition_list.remove(partition)

        } 
    }
}


def recompute_node_capacity(node, partitions) do
    new_capacit = node.capacity - partition.load
    
    node.capacity = new_capacity # ugly node's state mutation
end

and this actual draft Java impl where I can easily mutate map/list just in place which give me ability to achieve the goal:

@Test
    public void balanceLoad() {
        
        List<Integer> tasks = new ArrayList<>(); 
        tasks.add(10);
        tasks.add(10);
        tasks.add(10);
        tasks.add(7);
        tasks.add(6);
        tasks.add(5);
        tasks.add(5);
        tasks.add(2);
        tasks.add(2);
        tasks.add(1);
        tasks.add(1);
        tasks.add(1);        
        
        Map<Integer, List<Integer>> nodes = new HashMap<>();
        nodes.put(1, new ArrayList<>());
        nodes.put(2, new ArrayList<>());
        nodes.put(3, new ArrayList<>());
        
        
        System.out.println("Before: " + nodes);
                
        for (int nodeKey : nodes.keySet()) {
            List<Integer> nodeList = new ArrayList<>(tasks);
            List<Integer> nodeTasks = nodes.get(nodeKey);
            
            for (Iterator<Integer> iterator = nodeList.iterator(); iterator.hasNext();) {
                Integer task = iterator.next();
         
                if(task <= nodeCapacity(nodeTasks, 20)) {
                    nodeTasks.add(task);
                    iterator.remove();
                    tasks.remove(task);
                }
            }
        }
        System.out.println("After: " + nodes);
    }
    
    
    private int nodeCapacity(List<Integer> nodeTasks, int nodeCapCapacity) {
        int current = nodeTasks.stream().reduce(0, Integer::sum);
        return nodeCapCapacity - current;
    }

giving me this result:

Before: {1=[], 2=[], 3=[]}
After: {1=[10, 10], 2=[10, 7, 2, 1], 3=[6, 5, 5, 2, 1, 1]}

Can anybody assist to achieve similar in Elixir? Thank you for your time.

1 Like

Hello,
here is my rewriting your pseudo code

number_nodes = 3
weighted_tasks = [10, 10, 10, 7, 6, 5, 5, 2, 2, 1, 1, 1]
node_capacity = trunc(Enum.sum(weighted_tasks) / number_nodes)

1..number_nodes
|> Enum.map(fn index -> {index, %{capacity: node_capacity, tasks: []}} end)
|> Enum.map_reduce(
  weighted_tasks,
  fn {node_index, node}, tasks ->
    {node, tasks} =
      Enum.reduce(tasks, {node, []}, fn task, {node, new_tasks} ->
        if task <= node.capacity do
          {%{capacity: node.capacity - task, tasks: [task | node.tasks]}, new_tasks}
        else
          {node, [task | new_tasks]}
        end
      end)

    {{node_index, node}, Enum.reverse(tasks)}
  end
)
|> IO.inspect(charlists: :as_lists)
# output:
{[
   {1, %{capacity: 0, tasks: [10, 10]}},
   {2, %{capacity: 0, tasks: [1, 2, 7, 10]}},
   {3, %{capacity: 0, tasks: [1, 1, 2, 5, 5, 6]}}
 ], []}

3 Likes

Hello Artur and many thanks to you, this is solution I was seeking for.

Hi, I need exercism so I gave it a try with another approach. Hope you like it too :slight_smile:

defmodule M do
  def run() do
    number_nodes = 3
    weighted_tasks = [10, 10, 10, 7, 6, 5, 5, 2, 2, 1, 1, 1]
    node_capacity = trunc(Enum.sum(weighted_tasks) / number_nodes)

    nodes = List.duplicate(%{capacity: node_capacity, tasks: []}, number_nodes)

    nodes =
      Enum.reduce(weighted_tasks, nodes, fn
        task, [%{capacity: n} = node | nodes] when n >= task ->
          insert(%{node | capacity: n - task, tasks: [task | node.tasks]}, nodes)

        task, nodes when is_integer(task) and is_list(nodes) ->
          nodes
      end)

    # If you want the indexes as in @fuelen solution
    Enum.zip(1..number_nodes, nodes)
  end

  defp insert(%{capacity: cap} = lesser, [%{capacity: n} = candidate | others]) when cap < n,
    do: [candidate | insert(lesser, others)]

  defp insert(node, nodes),
    do: [node | nodes]
end

Thank you very much for taking a look, i’ll try this solution as well later as I already incorporated previous solution and just move on with testing other aspects. Thank you.

No worries, it was just an exercise to me, the other solution is good to keep :slight_smile: