Why does Stream.resource require next_fun to return tuples with enums as the first element?

Stream.unfold does not require enumerables as the first element; however, the non-halting tuples of Stream.resource’s next_fun must be enums.

Is there any specific reason for this?

I was trying to get familiar with Stream.resource and for a couple of days I couldn’t understand why my streams were not being accepted as an argument into Enum.take/2.

Also can I access non enum values from Stream.resource?

Several different questions here, can you explain your use case? That might make it easier to respond.

The next_fun returns one or more elements. These elements are used to form the stream. But it’s quite reasonable (and common) to return a single element on each invocation of next_fun. Here’s a simple example of a stream that returns a monotonically increasing integer:

stream = Stream.resource(
  fn ->
    0
  end,
  fn acc ->
    if acc == 10 do
      {:halt, acc}
    else
      {[acc + 1], acc + 1}
    end
  end,
  fn _ ->
    :ok
  end
)

Here you can see that next_fun returns a list of a single element. We can check how that behaves with Enum.take/2 and Enum.to_list/1:

iex> Enum.take(stream, 1)
[1]
iex> Enum.take(stream, 2)
[1, 2]
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

If next_fun returns more than one element, each element is inserted into the stream, one by one. For example:

stream = Stream.resource(
  fn ->
    0
  end,
  fn acc ->
    if acc == 10 do
      {:halt, acc}
    else
      {[acc + 1, acc + 2], acc + 1}
    end
  end,
  fn _ ->
    :ok
  end
)

Here the stream next_fun is returning multiple elements, but they are consumed one by one:

iex> Enum.take(stream, 1)
[1]
iex> Enum.take(stream, 2)
[1, 2]
iex> Enum.take(stream, 3)
[1, 2, 2]
iex> Enum.take(stream, 4)
[1, 2, 2, 3]
iex> Enum.take(stream, 5)
[1, 2, 2, 3, 3]
4 Likes

The second part of your question was “why return a list, not an element”.

If you think about the use case for streams, then there are circumstances where next_fun might be quite expensive. For example, perhaps we are streaming records from a database, or content across a network. Each request in this case will have connection, latency and transmission costs. It is more efficient to return a batch from the network service in these cases.

By allowing the return of multiple elements at one time (even though they are streamed one-by-one to the client of Stream.resource/3) we can be more efficient in building the stream and amortise some of these costs across more than one stream element.

5 Likes

Thanks, so my confusion is why is there a disparity between unfold and resource, where unfold does not require it (an enumerable as fast element of the next_fun return tuple) but resource does?

I am not very familiar with streams, so I do not really have a usecase,

I just figured that since the generation of non enum variables by stream was possible it would be possible to get said variable out if doing so through an enum was prohibited.

It’s just to learn more about how everything in Streams works.

Of course use of the enum itself is what activates the stream so I guess technically the error raised at that juncture indicates that something had indeed gone wrong.

resource is more “basic”/fundamental and closer to Stream.transform, which is what you would use to construct some of the more advanced features.

Stream.resource is what you would use when you have a “resource” that as mentioned before, might be expensive to “iterate” over (network calls) or where each iteration represents multiple “things” (e.g. a CSV row for a product quote that represents an order quantity, but the orders will need to be calculated and assigned to be pulled from the shelves individually).

Stream.unfold is closer to what you’d use as one of the first steps in a chain of Stream functions. If you notice the type spec, it takes an accumulator, which can be anything, and expects your function to emit elements, and give it the next accumulator, and then the whole thing, when run, returns an Enumerables/Stream. Whereas other Stream functions both take and return Enumerables/Streams.

If it helps, you can think of Stream.unfold as like the opposite of a reduce. With reduce you are reducing (like cooking, or chemistry, making water leave, dehydrating) a bunch of stuff into one new thing. With unfold you are taking one thing and stretching it out piece by piece into many things.

2 Likes

Suffice it to say that if you can get away with using just unfold then that’s fine. For context, I very rarely need to use resource as my first choice. There’s usually something else like flat_map or transform or something else that I usually consider and try first unless I’m dealing with something very heavy or that I would consider a resource like a file or a separate process where the cost of iterating over it too many times is very high or just doesn’t make sense

Considering the below example implementations from the documentation:

Example

Stream.resource(
  fn -> File.open!("sample") end,
  fn file ->
    case IO.read(file, :line) do
      data when is_binary(data) -> {[data], file}
      _ -> {:halt, file}
    end
  end,
  fn file -> File.close(file) end
)

Example

To create a stream that counts down and stops before zero:

iex> Stream.unfold(5, fn
...>   0 -> nil
...>   n -> {n, n - 1}
...> end) |> Enum.to_list()
[5, 4, 3, 2, 1]

See in .unfold next_fun returns

{n, n - 1}

on its iterations while in .resource next_fun returns

{[data], file}

If I try to return {data, file} instead of {[data], file} in Stream.resource I would get an error when I activate the stream, yet this is not the case with Stream.Unfold. Why is this?

Stream.resource is the “top” of a complexity hierarchy for “functions that produce a stream given a single initial value”. Not every member of that hierarchy is explicitly named in Stream:

  • Stream.repeatedly: the stream’s contents are each return value of the supplied function
  • Stream.iterate: the stream’s contents are each return value of the supplied function, that value also becomes the next argument
  • Stream.unfold: the stream’s contents are the first element of each return value of the supplied function, while the second element becomes the next argument
  • Stream.resource: as unfold, but with flat_map semantics and lazy setup / teardown

All of the simpler functions can be written in terms of the more-complicated ones, for instance:

# Stream.repeatedly(&:rand.uniform/0) |> Enum.take(3)
Stream.resource(
  fn -> :rand.seed(:exsss, {1, 2, 3}) end,
  fn _ -> {[:rand.uniform()], :ok} end,
  fn _ -> :ok end
) |> Enum.take(3)

# Stream.iterate(0, &(&1 + 1)) |> Enum.take(5)
Stream.resource(
  fn -> 0 end,
  fn acc -> {[acc], acc+1} end,
  fn _ -> :ok end
) |> Enum.take(5)

# Stream.unfold(0, fn
#   n -> {n, n + 1}
# end) |> Enum.take(10)
Stream.resource(
  fn -> 0 end,
  fn acc -> {[acc], acc + 1} end,
  fn _ -> :ok end
) |> Enum.take(10)

(note that the last two examples turn into the same resource calls)

A hypothetical Stream.resource_but_only_scalars would sit just below the actual Stream.resource in that hierarchy, but it doesn’t seem as useful as the full version. You could always make one if you wanted:

def resource_but_only_scalars(start_fun, next_fun, end_fun) do
  Stream.resource(
    start_fun,
    fn acc ->
      case next_fun.(acc) do
        {:halt, next_acc} -> {:halt, next_acc}
        {v, next_acc} -> {[v], next_acc}
      end
    end,
    end_fun
  )
end
5 Likes

I can’t explain the exact reasons for the questions in the original post, however, I tend to prefer Stream.unfold too because it offers more control. At a prior job I used it to write a module called Unpager that not only handled paginated third party APIs, but also used the accumulator to manage a Task that proactively fetched one page ahead of what was currently being consumed. I don’t believe that can be done with Stream.resource.

3 Likes

Thanks, been down with a cold, so I’ll probably have to reread this a few times, but it looks like it answers the first and titular question.

A hypothetical Stream.resource_but_only_scalars would sit just below the actual Stream.resource in that hierarchy, but it doesn’t seem as useful as the full version. You could always make one if you wanted:

def resource_but_only_scalars(start_fun, next_fun, end_fun) do
  Stream.resource(
    start_fun,
    fn acc ->
      case next_fun.(acc) do
        {:halt, next_acc} -> {:halt, next_acc}
        {v, next_acc} -> {[v], next_acc}
      end
    end,
    end_fun
  )
end

It is unclear to me how the usefulness diminishes. Looking at the way you map from the scalar enabled one (it seems it is not restricted to scalar but allows for scalar as well as vector values) to the normal Stream.resource using the wrapper, it seems to actually just provide syntactic sugar.

I wouldn’t do it because it seems like overkill just to remove the need to include brackets, but I’m not sure what functionality is lost by doing what you did?

What is stream resource trying to prevent, or facilitate, by requiring that you pass a list? It seems like it may be a design choice, maybe to force explicitness or maybe to force unpacking of lists’ elements into the returned list.

Thanks for your help and taking the time to break down each of the Streams and showing how Stream.resource is the more powerful of the illustrated.

I’ve not seen Stream transform implementations so I’ll be looking forward to that.

I’ll try provide an additional angle
To me, Stream.resource/3 is a merge between the bracket pattern applied to streams, and unfold along flat_map.

What the bracket pattern entails, is a way to manage resource:

  • how to acquire it
  • how to release it

It’s really simple, but it gives you this guarantee: If the resource acquisition is successful, then it will be released no matter what happen within the computation. (:kill signals tend to be external in origin :eyes: )
And as a basic function, it would be just defined as:

def bracket(acquire, release, compute) do
 resource = acquire.()
  try do
    compute.(resource)
  after
    release.(resource)
  end
end

Where would you want to use this pattern ?
We can look at Python’s with statement as a first example. It is commonly used to handle files.

acquire = fn -> File.open!("some.log") end
release = &File.close/1
bracket(acquire, release, fn handle ->
  # Do your thing
end)

We can do the same for using a temporary ets table:

acquire = fn -> :ets.new(nil, []) end
release = &:ets.delete/1
bracket(acquire, release, fn handle ->
  # Do your other thing
end)

Etc…

But what if we wanted to do particular manipulation with our resource handle and return a stream as a result of that ?

stream = bracket(acquire, release, fn handle ->
  complex_logic_that_returns_a_stream(handle)
end)

manipulate_stream(stream)

This won’t work, because the resource will be released before we use it. We could move the rest of the code inside the compute fun. But what if we want to reuse the stream for a different set of manipulation ? We need to redo the bracket. And that’s a departure from the what a stream can usually do.

To restore that, we need to define a bracket function that’s a bit more complex:

def bracket_stream(acquire, release, compute) do
  next_fun = fn
    {:stop, handle} -> {:halt, handle}
    handle -> {complex_logic_that_returns_a_stream(handle), {:stop, handle}}
  end
  Stream.resource(acquire, next_fun, release) 
end

Now our stream will work and we only need to care about the resource management in one place.


So what about Stream.resource/3 itself ? We could have a Stream.bracket/3 defined by itself in the standard library that wouldn’t reuse Stream.resource/3. And within we could use Stream.unfold/2 with the resource handle, and if our domain logic forces chunks upon us we can simply compose the stream with Stream.concat/1.
I think this was probably the most common use case, and only Stream.resource/3 was implemented to represent that case.


If we return to the bracket pattern, It’s kind of tedious to always reuse the same code for opening and closing files, or creating and destroying ets tables, or whatever else. It can be abstracted further to only provide the config used by the acquire and release code, along the computation fun. For example, for files we can imagine the following function:

def with_handle(path, compute) do
  acquire = fn -> File.open!(path) end
  release = &File.close/1
  bracket(acquire, release, compute)
end

Then we could just do

File.with_handle("some.log", &analyse_logs/1)

And that something we already have in the standard library :slight_smile:

File.open("some.log", &analyse_logs/1)

About Stream.transform/3,4,5, at work we mainly use it as a substitute for Task.async_stream/2,3. At first because async_stream didn’t exist yet, then because we noticed it was way better in term of performance in some cases. But rereading the code, it’s just behaving the same as a convoluted Stream.resource/3. (By that I mean that if we had Stream.bracket/3 the code would be simpler.)
We do use the non resource variant to do BSON stream decoding. This is doing a stateful map on a file stream.

And that’s pretty much how I see Stream.transform/3,4,5: modifying a stream with a stateful flat_map, possibly bringing along resource management.