Stream.zip question

Here’s an example:

IO.stream(io, :line)
|> Stream.map(&String.trim_trailing/1)
|> Stream.map(&String.split(&1, ",") )
|> Stream.zip([1, 2, 3])
|> Enum.to_list

That code pipes a line to the first Stream.map, which in turn pipes a line to the second Stream.map, which pipes a list to Stream.zip, say the list is ["a" "b", "c"]. My expectation was that Stream.zip would produce the same result as Enum.zip(["a", "b", "c"], [1, 2, 3]), but in this case Stream.zip operates on the entire results as if I had written:

IO.stream(io, :line)
|> Stream.map(&String.trim_trailing/1)
|> Stream.map(&String.split(&1, ",") )
|> Enum.to_list   #<---HERE
|> Stream.zip([1, 2, 3])
|> Enum.to_list

which produces something like:

[ 
    {["a", "b", "c"], 1},
    {["d", "e", "f"], 2},
    {["g", "h", "i"], 3}
]

That makes no sense to me. I want this:

[ 
    [{"a", 1}, {"b", 2}, {"c", 3}],
    [{"d", 1}, {"e", 2}, {"f", 3}]
    ....
]

What purpose does Stream.zip serve versus Enum.zip?

The error is using Stream.map(&String.split(&1, ",")). This will turn each line into a list of words. The subsequent zip pairs these lists with the numbers. You probably want to use Stream.flat_map/2 instead.

3 Likes

Okay, Stream.flat_map works! (Edit: not exactly. I get one flat list for the final result instead of nested lists.)

But…how is someone supposed to reason about piped Streams if the behavior is inconsistent? Sometimes the next Stream in the pipeline will receive one item, and sometimes the Stream will receive all the items. If a Stream has to wait for all the items, how is it even a Stream?

The behaviour is not inconsistent, but you need to consider what the input items are and how they are being transformed at each step. Let’s see this with your example:

IO.stream(io, :line)
|> Stream.map(&String.trim_trailing/1)
|> Stream.map(&String.split(&1, ","))
|> Stream.zip([1, 2, 3])
|> Enum.to_list

Let’s assume that io contains the following content:

first,line
second
third,line
and,this,is,the,fourth

We’re streaming the input line by line, so the first item is "first,line\n".

  1. Stream.map(&String.trim_trailing/1) takes this first line as a binary and passes on the binary with the trailing newline removed: "first,line"
  2. Stream.map(&String.split(&1, ",")) takes this binary and passes on a list of binaries by splitting the input on commas: ["first", "line"]
  3. Stream.zip([1, 2, 3]) takes the list, pairs it with the first item from the enumerable in its argument and passes on {["first", "line"], 1}

For the second line and the rest there is a similar progression, except that the fourth line is going to be ignored as Stream.zip/2 reaches the end of the enumerable in its argument.

Now let’s look at the version with Stream.flat_map/2 used when splitting:

IO.stream(io, :line)
|> Stream.map(&String.trim_trailing/1)
|> Stream.flat_map(&String.split(&1, ","))
|> Stream.zip([1, 2, 3])
|> Enum.to_list
  1. Stream.map(&String.trim_trailing/1) takes this first line as a binary and passes on the binary with the trailing newline removed: "first,line"
  2. Stream.flat_map(&String.split(&1, ",")) takes this binary, splits it on commas, and passes on each item in the resulting list as a separate output: "first" and "line"
  3. a. Stream.zip([1, 2, 3]) takes the "first" first, pairs it with the first item from the enumerable in its argument and passes on {"first", 1}
    b. it then takes "line" next, pairs it with the second item from the enumerable in its argument and passes on {"line", 2}

In the end it will stop after the first word of the second line since Stream.zip/2 runs out of items in the argument and will output {"second", 3} as the list item.

3 Likes

I see, I think I started on my first response before you added this snippet to the question. So in this case you want to run the zip operation for each input item and not for the entirety of the stream. Disregard my notes on using Stream.flat_map/2 since you want to keep items in each line together.

I think this will work:

IO.stream(io, :line)
|> Stream.map(&String.trim_trailing/1)
|> Stream.map(&String.split(&1, ",") )
|> Stream.map(&Enum.zip(&1, [1, 2, 3]))
|> Enum.to_list
2 Likes

Oh, nice! And…it lets me flip the arguments to Enum.zip around.

Stream.zip([1, 2, 3]) takes the list , pairs it with the first item from the enumerable in its argument and passes on `{[“first”, “line”], 1}

Okay, I see how Stream.zip is actually streaming something now, but the zipping is counter intuitive to how Enum.zip works. Stream.zip could still stream a result if it acted like Enum.zip, but I guess you can still get the Enum.zip result with your solution, and you can get the actual Stream.zip result as well.

Here’s my actual code, which reads a csv file. Any tips (besides using a csv library)?

example csv file:

id,ship_to,net_amount
123,:NC,100.00
124,:OK,50.00
125,:TX,30.00
126,:CA,100.00

defmodule Parse do
  def orders(path) do
    io = File.open!(path, [:utf8, :read])
    io
    |> get_headers 
    |> Enum.map(&String.to_atom/1)
    |> Stream.cycle
    |> get_rows(io)
  end

  def get_headers(io) do
    io
    |> IO.read(:line)
    |> String.trim_trailing
    |> String.split(",")
  end

  def get_rows(header_cycle, io) do 
    IO.stream(io, :line)
    |> Stream.map(&String.trim_trailing/1)
    |> Stream.map(&String.split(&1, ",") )
    |> Stream.map(&_convert_values/1)
    |> Stream.map(&Enum.zip header_cycle, &1)
    |> Enum.to_list
  end

  defp _convert_values([id, state, net]) do
    id = String.to_integer(id)
    state = String.trim_leading(state, ":")
            |> String.to_atom
    net = String.to_float(net)
    [id, state, net]
  end

end

I find this particular bit awkward:

  def orders(path) do
    io = File.open!(path, [:utf8, :read])
    io
    |> get_headers 
    |> Enum.map(&String.to_atom/1)
    |> Stream.cycle
    |> get_rows(io)
  end

It feels tortured to have to create the io variable so that I can pass io to get_rows() as well as get_header().

Also, is there a better way to convert a string like “:NC” to an atom?

Edit: I like this version better, although orders() still feels a little funky:

defmodule Parse do
  def orders(path) do
    io = File.open!(path, [:utf8, :read])
    io
    |> get_headers
    |> get_rows(io)

    # io
    # |> get_headers 
    # |> Enum.map(&String.to_atom/1)
    # |> Stream.cycle
    # |> get_rows(io)
  end

  def get_headers(io) do
    io
    |> IO.read(:line)
    |> String.trim_trailing
    |> String.split(",")
    |> Enum.map(&String.to_atom/1)
    |> Stream.cycle
  end

  def get_rows(header_cycle, io) do 
    IO.stream(io, :line)
    |> Stream.map(&String.trim_trailing/1)
    |> Stream.map(&String.split(&1, ",") )
    |> Stream.map(&_convert_values/1)
    |> Stream.map(&Enum.zip header_cycle, &1)
    |> Enum.to_list
  end

  defp _convert_values([id, state, net]) do
    id = String.to_integer(id)
    state = String.trim_leading(state, ":")
            |> String.to_atom
    net = String.to_float(net)
    [id, state, net]
  end

end

Alternative for orders():

  def orders(path) do
    io = File.open!(path, [:utf8, :read])
    headers = get_headers(io)
    make_rows(io, headers)
  end

What’s the fastest way to get all the lines from an io device in a list?

I really have to ask: why are you doing this? You implied you do not want CSV library and I do not understand why, especially having in mind you do not handle double quotes in your code at all. What happens if one of the rows in the file is this?

125,:TX,"30,00"

Your code will parse 4 values instead of 3 and things will fall apart. What is your issue with using a dedicated CSV library like NimbleCSV which handles these cases (and others) excellently?

3 Likes