Prepending an atom to an Ecto stream

I am fetching results from the database using a stream because the query will return millions of rows. I am trying to export the results to csv with a header, I am trying to prepend an atom to the stream so i can pattern match on it to use the postgrex results columns field write a header, but unfortunately I am having no luck. Any Suggestions how to write out the header from the stream? I dont wan’t to call the query 2x because it is expensive.

here is my code.

    file = File.open! Path.expand(options[:output]), [:write, :delayed_write, :utf8]
    Indie.Repo.transaction(fn ->
      Stream.concat(:begin, query)
      |> Indie.Repo.stream
      |> Stream.chunk_every(1, 1, :discard)
      |> Enum.each( fn [last, current] ->
        _ = case last do
          :begin -> (
            result = current
            line = Enum.join(result.columns, ";")
            IO.binwrite file, line <> "\n"
            )
          end
          
        result = current
        Enum.each(result.rows, fn row ->
          line = Enum.join(row, ";")
          IO.binwrite file, line <> "\n"
        end)
      end)
    end)
    File.close file

here is my error:

** (Protocol.UndefinedError) protocol Ecto.Queryable not implemented for #Function<62.51129937/2 in Stream.transform/3>. This protocol is implemented for: Atom, BitString, Ecto.Query, Ecto.SubQuery, Tuple
    (ecto) deps/ecto/lib/ecto/queryable.ex:1: Ecto.Queryable.impl_for!/1
    (ecto) deps/ecto/lib/ecto/queryable.ex:9: Ecto.Queryable.to_query/1
    (ecto) lib/ecto/repo/queryable.ex:24: Ecto.Repo.Queryable.stream/3
    (api) lib/api.ex:583: anonymous fn/2 in Api.unifiedtxns_history/1
    (ecto_sql) lib/ecto/adapters/sql.ex:798: anonymous fn/3 in Ecto.Adapters.SQL.checkout_or_transaction/4
    (db_connection) lib/db_connection.ex:1349: DBConnection.run_transaction/4
    (api) lib/api.ex:581: Api.unifiedtxns_history/1
    (elixir) lib/kernel/cli.ex:105: anonymous fn/3 in Kernel.CLI.exec_fun/2

I haven’t actually done this, but my first thought is to take advantage of the fact that the struct returned from File.stream! is Collectable. You could take your first row from the stream and write the column data and then the rows data to the file stream, then Stream.into the rest of the source stream into the file stream.

2 Likes

I havent had a chance to get back into this yet, but was thinking a bit more. I’m new to elixir, and this is the first project thats actually going to production using it. i have quite a bit of fragment functions being used in my select expression using a map. is there a way to programatically access the map keys of that select expression prior to executing the query? that could solve my problem as well.

I was able to solve this by getting the atoms from my select expression instead and writing those to file 1st as the header.

{_, _, select} = query.select.expr
columns = 
      select
      |> Keyword.keys
      |> Enum.map(fn key -> Atom.to_string(key) end)

That’ll work, but it’s also relying on an implementation detail of the query struct which might break in future ecto releases.