Streaming from Postgrex and calculating the checksum

I am streaming a very large table, from postgrex with Postgrex.stream and converting that to csv and sending it to the browser over a socket, which is then downloaded by the user into a file.

I have two issues:

  • If I do this as an Enum, I can calculate the checksum AND send the data. I seem to be able to only do one of those if it is a stream, meaning, I can calculate or send the data, but not both without creating something intermediate (like a file). Is there a way to do this simultaneously? Is there a way to split this into two processes, using the same stream and one calculates the checksum and stores it and the other sends the same stream to the browser?
  • I need to put headers on the front.

My code:

def stream(channel, type) do
    Repo.transaction fn ->
      Sql.get_all_by_timestamp()
      |> Stream.map(&(transform_rows(&1)))
      |> Enum.chunk_every(500)
      |> Enum.concat
      |> Broadcaster.Download.send_chunk(channel, type)
    end
  end

My Elixir skills are light, my apologies if this is an obvious question.

I don’t have the answer yet, but this article looks promising (from a user on reddit): https://ananthakumaran.in/2017/11/28/stream.html

This taught me a lot about Elixir. Here is the solution I came up with after talking with others:

  def stream(channel, type) do
    {:ok, {file_size, raw_checksum}} = Repo.transaction fn ->
      Sql.get_all_by_timestamp()
      |> Stream.map(&(transform_rows(&1)))
      |> Stream.each(fn chunk -> Broadcaster.Download.send_chunk([chunk], channel, type) end)
      |> Enum.reduce({0, :crypto.hash_init(:sha256)}, fn chunk, {size_acc, checksum_acc} ->
        { String.length(chunk) + size_acc, :crypto.hash_update(checksum_acc, chunk) }
      end)
    end

    checksum_hash = raw_checksum
        |> :crypto.hash_final
        |> Base.encode16

    {:ok, {file_size, checksum_hash}}
  end
1 Like

This might not be a concern for your requirements, but functions in Enum are eager, so they create an intermediate list when running. Depending on the size and number of chunks, this could use a lot of memory.

Switching to Stream.transform for the checksums calculation might help you as all the Stream methods are lazy.

Your comment inspired me to rewrite it. Thanks.

  def stream(channel, type, filename, user_id) do
    Repo.transaction fn ->
      Sql.stream_all()
      |> Stream.map(&(transform_rows(&1)))
      |> Stream.transform(
      fn ->
        csv_header = Enum.join(@headers, ",") <> "\r\n"
        Broadcaster.Download.send_chunk([csv_header], channel, type)
        { String.length(csv_header),
          :crypto.hash_update(:crypto.hash_init(:sha256), csv_header) }
      end,
      fn chunk, {size_acc, checksum_acc} ->
        Broadcaster.Download.send_chunk([chunk], channel, type)
        { [chunk], { String.length(chunk) + size_acc, :crypto.hash_update(checksum_acc, chunk) } }
      end,
      fn {size_acc, checksum_acc} ->
        Broadcaster.Download.send_complete(channel, type)

        checksum_hash =
          checksum_acc
          |> :crypto.hash_final
          |> Base.encode16

        details = %{
          filesize: size_acc,
          checksum: checksum_hash,
          filename: filename
        }

        AuditLoggers.Downloads.download_all(user_id, user_id, details)
      end)
      |> Stream.run
    end
  end
1 Like