How to send chunks of a zipped set of CSV files using a Stream

Good afternoon!

I am working on a project where users have the possibility to export data from different Psql tables as CSV.
Inspired by a great article (https://joaquimadraz.com/csv-export-with-postgres-ecto-and-phoenix) I managed to successfully setup a Stream to query and send the csv data as chunks to the user, where they are made available as a download.

This works great for one table at a time, but today I attempted to expand this as such that the user is able to request for multiple tables and receive the data in csv format zipped (compressed) into one file. However, I cannot get my head around how to achieve this.
Zipping data in a lazy manner seems counterintuitive to me, is this possible in Elixir/Erlang? It appears to me I need a structure that lazily reads rows from a Psql copy command, while zipping and also keep track of where one file stops and another starts.

How can I compress the data and send it as one package using a Ecto.Adapters.SQL stream and send it chunked to the user?

I hope I phrased the question clear enough, please let me know if more explanation is needed.

1 Like

It would be great if you give a step-by-step example, giving several table names, input parameters and expected output – because I can’t quite follow.

I’ve done streaming and in-place CSV reports in Elixir before and it can be confusing when you first do it but rest assured, it’s very doable.

3 Likes

Thank you for your response, right now the flow is as following:

The data model up for export so far contains of a user table (name, email etc.) and a table of achievements within the application (title, expiration date, description).
A user request to download their data, which result into a GET request to the server.
The input for now is only the user id, and the result is queried from one table resulting into one CSV file using a COPY statement, i.e.:
COPY (
query here
) to STDOUT WITH CSV DELIMITER ‘,’;
“”"
This query is used in:

defp stream(query, header, batch_size \ 500) do
Repo
|> SQL.stream(query, [], max_rows: batch_size)
|> Stream.map(& &1.rows)
|> (fn stream -> Stream.concat(header, stream) end).()
end

In a controller, I reduce this stream and send the data in chunks to the user back using chunk/2.

The desired situation would be:

  1. User requests export
  2. The data from the user table is queried and copied as a CSV file, while the data from the achievements table is also copied into another separate CSV file.
  3. The two files are ‘zipped’ together and returned to the user
  4. Preferably the entire chain happens in a stream, with chunked responses

I hope this makes things clearer, if not please let me know!

Off the top of my head:

defp produce_report(query, converter_function, header, file_path)
when is_function(converter_function) do
  {:ok, _} = Repo.transaction(fn() ->
    # This function assumes the result will be written to a file, not returned as a value.
    query
    |> Repo.stream
    |> converter_function.(header, file_path)
    |> Stream.run
  end, timeout: :infinity)
end

defp produce_report_users() do
  produce_report(query_users(), &YourReportModule.convert_user_to_csv_and_append_to_file/2, users_header(), users_file_path())
end

defp produce_report_achievements() do
  produce_report(query_achievements(), &YourReportModule.convert_achievement_to_csv_and_append_to_file/2, achievements_header(), achievements_file_path())
end

def produce_reports_users_and_achievements() do
  # Start all tasks asynchronously in the background.
  users_task = Task.async(&produce_report_users/0)
  achievements_task = Task.async(&produce_report_achievements/0)
  # Wait until all tasks are done.
  results = Task.yield_many([users_task, achievements_task], :infinity)
  #...<your code verifying the list of tuples that is the "results" variable goes here>...
  #... 
  # Create the resulting ZIP file. 
  zip_file = :zip.create(result_path(), [users_file_path(), achievements_file_path()])
  # ...<your code to handle the created ZIP file goes here>...
end

For more options on processing the results of many asynchronous tasks (as used in this very crude example), please see Task.yield_many docs.

Also check Erlang’s zip module.

Again, this is very rough but it should give you a general idea. I opted to spawn each report in a separate process which should be beneficial on multi-core machines; however, if each report is quite heavy and needs a lot of CPU, then you might want to just iterate over each report in-place (without spawning processes) – that depends on your priorities.

6 Likes

This looks very promising, I will try this out today!
Thank you very much for your effort, I will report on whether I managed to solve this or not.

This worked out perfectly. Thank you very much for the assist @dimitarvp, this topic can be closed!

FYI you can mark mine as the accepted answer and it’s considered closed.

Glad I could help! Hope this made you understand Elixir better. :slight_smile:

1 Like