How to check if an argument is a Stream or Stream-like function?

I have a function that can operate on a single struct of a specific type, a list of those structs, or a stream of those structs:

def update_data(%MyStruct{} = my_struct), 
  do: %MyStruct{my_struct | data: "Here's the new data!"}
  
def update_data(%Stream{} = stream),
  do: Stream.map(stream, &update_data/1)

def update_data(structs) when is_list(structs),
  do: Enum.map(structs, &update_data/1)

def update_data({:ok, value}),
  do: {:ok, update_data(value)}

def update_data(value),
  do: IO.inspect(value, label: "update_data fell through")

This is working great under almost all conditions. The only exception is when the last step in the pipe before update_data/1 is Stream.flat_map/2. I have discovered this is because the %Stream{} = stream pattern match is failing, because the value returned from Stream.flat_map/2 isn’t a stream:

Interactive Elixir (1.13.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> [1, 2, 3] |> Stream.map(& &1)
#Stream<[enum: [1, 2, 3], funs: [#Function<47.58486609/1 in Stream.map/2>]]>
iex(2)> [1, 2, 3] |> Stream.each(&IO.puts/1)
#Stream<[enum: [1, 2, 3], funs: [#Function<38.58486609/1 in Stream.each/2>]]>
iex(3)> [1, 2, 3] |> Stream.flat_map(& &1)  
#Function<59.58486609/2 in Stream.transform/3>

I guess that means Stream.transform/3 doesn’t return a new stream, either, but a function as well.

I know I can work around this in a couple of different ways, but none of them seem ideal. I don’t want to make the caller responsible for making sure the thing being piped to update_data/1 is an actual Stream. I also don’t want to assume any given Function that shows up is pipe-able to Stream.map/2.

What’s the best way to get update_data/1 to behave the way I want it to?

I lack your context so this might be a super dumb question but since you know you’re working with a Stream in a certain module, why not just use Stream.map(stream, &update_data/1) directly there, on the spot, and be done with it?

Your code feels like you’re trying to emulate OOP polymorphism and I am not sure that’s the best way of achieving your desired result in an FP language like Elixir.

Just have functions that work on data piecemeal and use the various Stream functions that use them as closures and that should be enough?

The details escape me (I suspect it was a performance optimization/tradeoff), but any arity 2 function will be accepted as a stream. The reasons are deeper in the Enumerable protocol. The function has to comply with the reducer() type (Enumerable — Elixir v1.13.2) to return the proper tuples of the acc() type. You can use a guard of is_function(provided_fun, 2) but it is imperfect.

1 Like

Not a dumb question. This is a function that updates a virtual field in an Ecto schema struct, and I want it to work whether the upstream function is Repo.stream/1, Repo.all/1, Repo.one/1, or anything else that produces or emits one or more of the struct in question. I could push the iteration upstream as well, but this seemed more elegant somehow.

Stream API can take any Enumerable as input. Stream API is simply an API, which operates on Enumerables lazily, while Enum operates eagerly. Therefore there‘s not really a stream datatype in elixir. Any struct can implement the Enumerable protocol and technically become „a stream“ as in a possible input to Stream API.

Given there’s no differenciating factor in the input being Enumerables you’ll need some way to let the caller tell you if they want the Enumerable to be processed lazily or eagerly. That could be different functions or some additional parameters on a single function.

2 Likes