Stupid pet tricks: functions as state

Had a little task today, and the quick solution felt a bit like a stupid pet trick … thought I’d share it here as something a little different from the continual stream of questions :wink:

The task was this: we needed all the pair-wise combinations from a set of entries, and these need to (for performance reasons) be batched up. So we have data like this:

[1..10_000]

… which gets turned into batches like this using Enum.chunk_every/2:

[[1..500], [501..1000], [1001..1500], .. etc]

We then want to do some computation with the next value (e.g. 1) against the rest of the values in its batch (e.g. [2…500]) and then subsequently against each further batch (e.g. [501…1000], then [1001…1500], etc). Pair-wise combinations. Fun.

The computation is done async and the batches are generated on request by a GenServerfor consumption by workers. So we need to keep track of where we are in the batching, so we need to keep state. Yuck! State! amiright?

Instead of keeping the chunked data around in a state term, I instead opted to keep a function there instead which captured that data:

batches = Enum.chunk_by(sequence, batch_size)
state = fn -> next_batch(batches) end

It is then used like this from a message handler in the GenServer:

def next_job(state) do
  case state.() do
    {:done, _} = done -> done
    {{subject, batch}, next} -> {create_job(subject, batch), next}
  end
end

What is that next_batch call in initial state term, you ask? (Ok, you probably didn’t … but, then again, maybe you did since you have read this far!)

defp next_batch([[current| rest] | batches]) do
  next_batch(current, rest, batches, [])
end

defp next_batch(current, [], [], []) do
  done_tuple()
end

defp next_batch(_current, [], [], [[next | rest] | batches]) do
  {{next, rest}, fn -> next_batch(next, rest, batches, []) end}
end

defp next_batch(_current, [next|rest], [], acc) do
  {{next, rest}, fn -> next_batch(next, rest, acc, []) end}
end

defp next_batch(current, rest, [next_batch | batches], acc) do
  {{current, next_batch},
    fn -> next_batch(current, rest, batches, [next_batch | acc]) end}
end

defp done_tuple(), do: {:done, fn -> done_tuple() end}

Generators! Ignoring the moderately ugly function headers, the useful bit is that the functions return a tuple containing the result of the calculation (the next batch) as well as an anonymous function that contains the next call to next_pair_job that can be used to get the next job … this allows the “detail” of how next_batch/4 works to be entirely opaque to the calling code.

It calls the function (which is its state!) until it gets a :done tuple. It doesn’t matter how many times it is called as the :done tuple has a function which, when called, itself returns the same :done tuple. As this is used in a distributed application where we can not know the order or number of calls in advance, that’s a necessary attribute to have, and the above manages that elegantly. :slight_smile:

It was just a nice way to encapsulate the actual iteration through the batches so it could be “hidden” from the GenServer using it without cluttering up its own message handlers. Performance was just fine, so the code cleanliness this approach offered was considered to offset the overhead.

During code review it came up as an out-of-the-ordinary approach (though certainly not novel), so thought I’d share it here. Yay, stupid pet tricks! :slight_smile:

8 Likes

It seems the real culprit for containing state isn’t mentioned: closure.

Returning state via a function closure, for example, is at the core of implementing trampolines in JavaScript in order to implement stackless recursion.

And then there is this (2003):

  • Objects are merely a poor man’s closures
  • Closures are a poor man’s object
6 Likes

It’s not entirely true in Elixir because of one thing - you can’t mutate data in the closure. It’s entirely true in runtimes with mutable data.

4 Likes

There’s one disadvantage of the described pattern - the state is opaque. This will become problematic in some debugging situations - the function is basically opaque and you can’t “look into” it with default logging to figure out what’s going on (unless you use some tricks like :erlang.fun_info(fun, :env) to get the data bound in the closure).

I’d propose a slightly different, but I believe similarly convenient mechanism of a “continuation token”:

def next_job(token) do
  case Batcher.next(token) do # instead of state.()
   {:done, token} -> token
   {{subject, batch}, token} -> {create_job(subject, batch), token}
  end
end
def next({next, rest, batches, acc}) do
  next_batch(next, rest, batches, acc)
end

defp next_batch(_current, [], [], []) do
  done_tuple()
end

defp next_batch(_current, [], [], [[next | rest] | batches]) do
  {{next, rest}, {next, rest, batches, []} # instead of fn -> next_batch(next, rest, batches, []) end}
end

# and similar

This provides similar benefits of encapsulating the continuation state, but makes it more debuggable. Similar approach is used in things like :ets.select/1 for batched traversal of the ets table.

4 Likes

Yes, that also works and indeed has advantages in terms of transparency.

In this particular case: the generator functions are unit tested and pure, so the only things needed in tracing are the created jobs (which are done with the return values of the state function, but which are not the return values of the state function) and the initial state which is known and visible when the state is initialized.

So while it is opaque, in this case it is in an entirely benign way. A benefit is that one does not need to know what function is being called at all -> it could be calling any function in any module, and the generator itself could live elsewhere as well … and if what it calls (and in what order) changes, this is also opaque to the caller. Having to know that the token “belongs with” Batcher.next/1 is not quite as cute, and it makes it harder to move the closure to an entirely other handler at runtime, since each handler needs to know what function that token “goes with”.

Still, there are indeed times that is not useful / worth it / possible without contortion, and then it is much better to pass the data about.

1 Like
  1. In Elixir you can’t mutate any of the data structures that serve as object surrogates either.
  2. Parables aren’t that literal.

The point is that a closure can serve as a container of related data, a role in which objects are typically used and that to some degree one can be used to emulate the other - which you promptly exploit by proposing to replace the closure with a token.

Java for the longest time was using inner classes as surrogate closures and the command pattern emulates a closure.

2 Likes

:slight_smile:

indeed … with generators being one possible specialization of closures: when they are used to control iteration. (Not to suggest all generators are closures, as we know from e.g. comprehensions …)

1 Like

I actually think that we can do some Y-combinator-like trickery here to create a closure that, when called, returns new versions of itself that include updated data.

Not that that would be extremely useful, probably, since it would indeed mostly make introspection difficult. :sweat_smile:


@aseigo Thanks for sharing this technique though! I am wondering currently if it is possible to use some more higher-level stream combinators instead of writing out the recursion of `next_batch manually; this might improve readability. I’ll give it a try.

The category is “stupid pet tricks” … they don’t need to be useful, only fun :wink:

Should be possible … would be interested in seeing what you come up with if you toy with it a bit!

2 Likes

Please tell me if this code does not follow the requirements to the mapping of data, but here it is:

defmodule PairwiseComputation do

  def create_all_jobs(data) do
    data
    |> Enum.chunk_every(3) # We now have a list of batches
    |> Enum.reverse
    |> Enum.scan([], fn batch, later_batches -> [batch] ++ later_batches end) # now every item contains a list with all later batches (including current one) in there.
    |> Enum.reverse # reversing again is only necessary if you rely on order of results.
    |> Enum.map(&create_jobs/1)

  end


  defp create_jobs([]), do: []
  defp create_jobs([first_batch | batches]) do
    case first_batch do
      [] -> {:error, "unexpected format of first batch."}
      [pivot | first_batch] ->
        [first_batch | batches]
        |> Enum.map(&create_job(pivot, &1))
    end
  end

  defp create_job(pivot, elem), do: IO.puts "creating job for #{inspect pivot} <=> #{inspect elem}"
end

PairwiseComputation.create_all_jobs(1000..1100)

The output is a list of lists of :oks, which you might improve upon by gathering errors that might have occured in the creation tasks and combining them, if required.

1 Like

While that probably works (didn’t run it to confirm, but I trust you :wink: ), it has a serious shortcoming given that this is a combinatorial algorithm: it scales super poorly. An average request from our workload generates ~100,000,000 pairs, which when batched up create some 200k batch jobs (which itself is chunking size that is an optimization over space and compute time).

We don’t want them all created up front as it would mean holding all the jobs for every request in memory at the same time and means we can’t start any jobs until they are all created in the first place. That ends up being non-trivial, especially once we start adding parallel requests into the mix.

But in the realm of “can it be done”, you’ve shown it can :slight_smile:

Almost everywhere in my code snippet it is possible to use Stream instead of Enum, which means you end up with a stream of to-be-created jobs, meaning that the next one is only constructed once there is a worker that wants to consume it.
If you’d like to do this, you probably want to change the Enum.map(&create_jobs/1) to a Stream.flat_map, as well as changing the Enum.map(&create_job(pivot, &1)) to a Stream.flat_map.

It is only difficult to reverse a Stream since we don’t yet know how many elements there will be, but this is something that the code you showed in the first post also does not address.
(And for that note, it is definitely possible to create a version that works with streams that starts from the beginning, but memory usage will ever increase since the first elements of old batches will all have to be retained to compare to newer oners; this is why I presumed that your set of input elements have some well-defined maximum size after which old ones do not have to be compared to newer ones anymore).

This is a fun exercise :smiley:

Yes, we do not know how many elements there will be before being called (though it obviously does not change once started) … but no, no particular maximums other than our own sanity :wink:

The calculation is commutative so we do not care about ordering of the pairs. So once a given element is processed with respect to all other elements we can safely forget about that element.

Is also so happens we also do not care about order of results, just that they all get processed eventually, so order preservation is also thankfully not a requirement.

But the real annoyance here is that Stream by itself does not provide a nice way to start moving through a stream, stop at a given point, and return. The best way that I know of is to use Stream.transform/4 and then save the accumulator returned from the after function. Which means we get precisely zero benefit from the abstraction of Stream here. AFAIK, Stream is not intended to be used asyncronously, and that is apparent from its API. (Someone please correct me on that point if I am wrong!)

This is the sort of thing that GenServer does enable, but in this case we’d only really need/want one stage and it would internally implement exactly such an iterator and so … we’re back to square one :wink:

It is the need for an asynchronous stepping through a single set of data, or a resumable stream, that led to then “hand-rolled” iteration function.

1 Like

Indeed; it is not possible with the built-in Enum and Stream to only consume a couple of elements at a time: You can either consume an Enumerable fully, or not at all.
The reason for this has been performance, IIRC from conversations I had about this with @josevalim in IRC about a year ago. (And on top of this, some things we like to be streams like files, require the closing of file handles when we’re done with them; keeping open a file handle during a suspended iteration for a long time is a bad idea)

At some point I came across a similar problem where this limitation made Enum and Stream a bad abstraction for me, and therefore I wrote the Extractable library (as well as the related Insertable).


That said, using streams in this way is definitely possible; however, you end up calling Stream.take and Stream.drop more frequent than you’d like.

Here’s an example using Streams:


```defmodule PairwiseComputation do

  def create_all_jobs(data) do

    data
    |> Stream.chunk_every(3)
    |> tails
    |> Stream.map(&create_jobs/1)
    |> Stream.concat

  end

  defp tails(stream) do
    [{}]
    |> Stream.cycle()
    |> Stream.transform(stream, fn
      _, elems ->
        case Enum.take(elems, 1) do
          [] -> {:halt, []}
          _ ->
            {[elems], Stream.drop(elems, 1)}
        end
    end)
  end

  defp create_jobs(stream) do
    first_batch = Stream.take(stream, 1)
    case Enum.take(first_batch, 1) do
      [] -> []
      [pivot] ->
        Stream.concat(Stream.drop(first_batch, 1), Stream.drop(stream, 1))
        |> Stream.map(&create_job(pivot, &1))
      other -> IO.inspect(other)
    end
  end

  defp create_job(pivot, elem), do: IO.puts "creating job for #{inspect pivot} <=> #{inspect elem}"
end

# Example: 
PairwiseComputation.create_all_jobs(1000..1100) |> Enum.take(20)
1 Like

Yeah, but then you aren’t really using streams IMO… you’re just using them as an odd abstraction over normal collections and you may as well just write the recursion by hand (again, imo)

Oh, neat. I will have to check these out. Thanks!

This right here is why I like tuple calls. They were basically function closures but with the ‘internal state’ entirely visible and accessible!

Processes, those are the Elixir version of OOP objects, and they do ‘mutate’ themselves by updating their stack state. ^.^

That’s what chainable tuple-calls did, it just returns the updated state. Monadic tuple calls were awesome, now you can’t do that anymore without introducing new operators or so (of which there are plenty of libraries of).

That’s just monads with mini-effects, make a monad/effect library for that purpose. :wink: