Streaming xml file over ftp fails if I don't convert it into a list first

I’m trying to stream an xml file over ftp with Stream.resource/3 and SweetXml.stream_tags!/3, but I’m running into a strange issue where sweet_xml raises an error, before chunk arrives, unless I call Enum.to_list/1.

# Function definitions

def stream_file!(client) do
  Stream.resource(fn -> start_file_stream(client) end, &next_file_chunk/1, &stop_file_stream/1)
end

defp start_file_stream(client) do
  Logger.debug("Requesting to stream file")

  with :ok <- :ftp.recv_chunk_start(client.conn, to_charlist(client.credentials.file_path)) do
    client
  end
end

defp next_file_chunk(%{conn: conn} = client) do
  case :ftp.recv_chunk(conn) do
    {:ok, binary} ->
      Logger.debug("Stream file chunk #{byte_size(binary)} bytes)")
      {[binary], client}

    :ok ->
      Logger.debug("Transfer completed successfully")
      {:halt, client}

    error ->
      Logger.error(
        credentials_id: client.credentials.id,
        message: "Unable to stream file",
        reason: error
      )

      {:halt, error}
  end
end

defp next_file_chunk(error), do: {:halt, error}
defp stop_file_stream(_), do: []

# Code that is executed

# conn is the pid from :ftp.open/2
# Credentials holds infos about the file and the credentials for the ftp server
client = %{conn: conn, credentials: credentials}

client
|> stream_file!()
# For debugging
|> Stream.each(&IO.inspect(&1, label: :chunk))
# |> Enum.to_list()
|> SweetXml.stream_tags!(:Mitglied, discard: [:Mitglied])
|> Stream.map(fn {:Mitglied, elem} -> 
  # parse data here
end)
|> Stream.run()

Calling this code produces the following output. If I uncomment |> Enum.to_list() it will work fine and print all the chunks as they come. Any idea what might cause this?

[debug] Requesting to stream file
[error] 3917- fatal: :expected_element_start_tag

** (SweetXml.XmerlFatal) :expected_element_start_tag
    (sweet_xml 0.7.3) lib/sweet_xml.ex:539: anonymous fn/1 in SweetXml.stream!/2
    (elixir 1.14.4) lib/stream.ex:1619: Stream.do_resource/5
    (elixir 1.14.4) lib/stream.ex:1813: Enumerable.Stream.do_each/4
    (elixir 1.14.4) lib/stream.ex:689: Stream.run/1

For reference, I have a separate implementation for streaming files over sftp with sftp_client, which works perfectly. I’ve looked into their implementation, but couldn’t find any meaningful difference.

Does it work if you comment out the Stream.each part?

Unfortunatelly no, it doesn’t have an effect at all.

Are you using a local ftp server, the ftp standard sometimes floats, especially when it comes to old servers.

It’s a remote server of a company we collaborate with, so I don’t know any further specifics about it. The same server has been used for a related project, that has been going since 2013. But still, I wonder how that could influence the stream in such a way

It’s about quirks in protocol, you can query the server and see the software it runs on, at one of my jobs we had some nasty old ftp servers running on windows and erlang client had problems with them time to time.

I am aware you likely don’t want to complicate your setup but if I was in your place I’d just periodically download those files via rclone and then process them locally with Elixir. (Or you can download them on-demand as well, Erlang/Elixir can interface with the rest of the CLI tools in the system via ports pretty well.)

I don’t think that’s necessary or worth it. The whole file can fit easily in memory and it’s probably never going to be too big for that. Being able to properly stream the file would have been a welcome optimization though.

I get what you are saying but diving into a deep rabbit hole might burn you an order of magnitude more time. FTP is old, and not all servers strictly adhere to all parts of the spec. Chasing this down might cost you dearly.

Hence I’d eliminate any potential cause for things spiraling out of control from the get go.

Then I’ll just take this advice and just admit defeat. Enum.to_list may stay here for now

The fun thing is that I had exact this issue at some point, and I can’t remember how I solved it, because it was working, maybe try to search about chunk size, however I might as well be mistaken this with sftp.

The erlang ftp module does not allow you to set the chunksize (or at least I couldn’t find it in their docs), so I assume it’s sftp

1 Like

Just out of curiosity… I’m building a feature where a Phoenix application is creating files that need to end up on a remote filesystem with low latency, through sFTP. I want to retain flexibility, and avoid coupling the application to the specifics of the transfer protocol used. rclone seems like a really nice tool for that job, offloading the responsibility to copy files to whatever destination. I’m not sure how you’d use rclone in conjunction with a regular Phoenix application. Do you run it in response to some event that triggers the file generation? Or does it run in the background, managed by an elixir process and supervisor, or maybe even outside the Phoenix application? One of my concerns would be what happens when multiple application nodes are pointing to the same destination (but maybe there is no problem whatsoever).

Thanks for referencing rclone as an alternative!

No clear-and-cut answer but I would very likely opt for having N background rclone processes waiting (a pool for Oban workers for example) and then enforce uniqueness via Oban’s mechanism so e.g. you can’t write to the same file from 2 or more workers. That’s trivially achievable.

The tricky part is of course if somebody wants the file upload to be synchronous but IIRC Oban offers that as well? (I think you can send a job and wait on it?)

BTW when it comes to sFTP in particular I think Erlang is slightly better equipped but don’t quote me on this, I only used a library for it once and it was a few years ago. I’d still try work with a normal library and only reach for an external tool if things don’t work stably – as it happened with OP.

But you also said you don’t want to tie your application’s logic to the transport protocol. Hm, in that case I’d probably choose ex_aws_s3 because you can still use it not only for Amazon S3 like Cloudflare’s B2 but also for self-hosted software like MinIO.

1 Like

Yeah sftp erlang client works pretty much flawlessly, the standard is sound and almost all servers use pretty much the same version (v3 sftp, v2 ssh if I remember well), so I would recommend to use that instead of calling an external process, because there is always the risk in a long-running system that you will have some kind process leaks at some point.

2 Likes

Allright, thx @dimitarvp and @D4no0 for the pointers!