Stream uses all available memory and cannot run with limited memory

I am working on a small ETL script and using Elixir instead of Python for the task. One pretty interesting observation is that Elixir uses all available memory when using a Stream based function.

defmodule Transform do
  import SweetXml

  def xml_to_csv(xml_file, xml_tag) do
    IO.inspect("Processing #{xml_file}")
    csv_file_name = String.replace(xml_file, ".xml", ".csv")

    xml_file
    |> File.stream!()
    |> SweetXml.stream_tags!(xml_tag, discard: [:xml_tag])
    |> Stream.map(fn {xml_tag, doc} ->
      doc
      |> SweetXml.xpath(~x"//#{xml_tag}"e, id: ~x"./id/text()"s, name: ~x"./name/text()"s)
    end)
    |> Stream.map(fn x -> [x.id, x.name] end)
    |> CSV.encode(separator: ?\t)
    |> Stream.into(File.stream!(csv_file_name))
    |> Stream.run()
  end
end

Once I invoke the function with the following it runs out of memory and crashes:

Transform.xml_to_csv("data/discogs_20231001_releases.xml", :release)
"Processing data/discogs_20231001_releases.xml"
eheap_alloc: Cannot allocate 762886488 bytes of memory (of type "old_heap").

Crash dump is being written to: erl_crash.dump...done

The data is pretty big:

❯ ls -larth data/discogs_20231001_releases.xml
-rw-r--r--@ 1 l1x  staff    72G Oct 28 22:06 data/discogs_20231001_releases.xml

Data:

https://discogs-data-dumps.s3-us-west-2.amazonaws.com/data/2023/discogs_20231001_releases.xml.gz

1 Like

It seems the issue is related to XML parsing in Elixir. I have tried both SweetXml and Saxy but the whole stream cannot run in O(1) memory and requires more than 2G (that I am limiting the VM to trigger OOM sooner). SweetXml runs out of memory much faster. Saxy goes further in processing the file but it still reaches the 2G threshold. Looking into manually triggering GC and processing XML with an external tool instead of using these libraries unless there is something I am not considering.

Hey @l1x yeah SweetXML certainly has an internal accumulator, I’m not sure about saxy. Can you show the handler you wrote for Saxy? Looking to try it locally. I don’t think this is relevant but I’d also swap in NimbleCSV over CSV, it’s a bit better optimized.

1 Like

XML tools sigh…

I haven’t seen many great ones in the years of XML has existed, generally plagued by interoperability and security issues.

You may wish to look at NimbleParsec for stream parsing and implement your own XML parser to extract the elements you are interested in. There is a starting point of a “simple” XML parser in the docs (it doesn’t handle attrs but should be easy to extend).

defmodule SimpleXML do
  import NimbleParsec

  tag = ascii_string([?a..?z, ?A..?Z], min: 1)
  text = ascii_string([not: ?<], min: 1)

  opening_tag =
    ignore(string("<"))
    |> concat(tag)
    |> ignore(string(">"))

  closing_tag =
    ignore(string("</"))
    |> concat(tag)
    |> ignore(string(">"))

  defparsec :xml,
            opening_tag
            |> repeat(lookahead_not(string("</")) |> choice([parsec(:xml), text]))
            |> concat(closing_tag)
            |> wrap()
end

SimpleXML.xml("<foo>bar</foo>")
#=> {:ok, [["foo", "bar", "foo"]], "", %{}, {1, 0}, 14}
2 Likes

Thanks @adw632 I am looking into doing XML parsing outside of SweetXml and Saxy. NimbleParsec probably the way to go.

1 Like

I am using the example one:

defmodule Artist do
  @behaviour Saxy.Handler

  def handle_event(:start_document, prolog, state) do
    IO.inspect("Start parsing document")
    {:ok, [{:start_document, prolog} | state]}
  end

  def handle_event(:end_document, _data, state) do
    IO.inspect("Finish parsing document")
    {:ok, [{:end_document} | state]}
  end

  def handle_event(:start_element, {name, attributes}, state) do
    IO.inspect("Start parsing element #{name} with attributes #{inspect(attributes)}")
    {:ok, [{:start_element, name, attributes} | state]}
  end

  def handle_event(:end_element, name, state) do
    IO.inspect("Finish parsing element #{name}")
    {:ok, [{:end_element, name} | state]}
  end

  def handle_event(:characters, chars, state) do
    IO.inspect("Receive characters #{chars}")
    {:ok, [{:characters, chars} | state]}
  end

  def handle_event(:cdata, cdata, state) do
    IO.inspect("Receive CData #{cdata}")
    {:ok, [{:cdata, cdata} | state]}
  end
end

If you’re using the example handler from saxy then it accumulates the entire doc into a gigantic array-- you’ll need to explicitly handle only the elements/text that you care about. I can try and whip up some example code in a bit. Saxy also supports stream parsing which is probably better for this use-case.

Have you tried Erlang’s :xmerl? I have yet to meet a big XML that could trip it but admittedly I never tried with a 72GB one.

It’s also more difficult to code for it but very doable. Shame that just today is probably going to be my busiest day this month. I’d try helping a day or two later though (or if I manage to do my tasks quicker today).

EDIT: I see that Saxy actually uses :xmerl, hmmm, but I’d still try my hand at the raw stuff.

1 Like

Thanks @ducharmemp I was not aware just by looking at that code. The documentation says that you can use the handler in stream processing.

I would love to see how I could skip the accumulation part and just parse each XML document (which means each line) one by one without running out of memory.

Apologies, busy day caught up with me. I did a bit of poking at the XML structure and it’s a bit unclear as to what you’re looking for in there since I’m not sure the XPaths actually match up with the structure of release, so sorry about not having much in the way of example code. But to give a tl;dr on the way that SAX in general works with elixir, the idea is that it’s a recursive-ish function that carries on some additional state. A special cased-reduce, if you will.

So

  def handle_event(:start_element, {name, attributes}, state) do
    IO.inspect("Start parsing element #{name} with attributes #{inspect(attributes)}")
    {:ok, [{:start_element, name, attributes} | state]}
  end

In this function specifically, you’re saying “append on the 3-tuple of start_element, name, and attributes to the head of the state list” with [{:start_element, name, attributes} | state]}. That’s why Saxy runs out of memory. If you want to look at only a specific tag, you’ll have to do some filtering. For example (illustrative only)

  def handle_event(:start_element, {name, attributes}, state) do
    if name == "release" do
        {:ok, %{state | in_release: true }}
    else
       {:ok, state}
    end
  end

state in the above example doesn’t have to be a list, it can be anything at all, just like Enum.reduce.

That said, I tried throwing some solutions at your particular problem and I think this is one of those situations where I’m going to have to be more vague than I’d like-- with a 72GB file, you’re going to have to figure out some engineering to get the data transformed in a reasonable amount of time, or pull in some rust via rustler to really let the CPU rip. Even when swapping out Saxy (the fastest XML parser when I tested my declarative XML parsing lib GitHub - ducharmemp/saxaboom, check the benchmarks) for fast_xml, I was still looking at multiple tens of minutes to basically do no real work. Script below for completeness

Mix.install([:fast_xml])

defmodule CSVHandler do
  use GenServer

  @impl true
  def init(fname) do
    {:ok, File.open!(fname, [:write])}
  end

  # really rough, I wouldn't recommend productionizing this
  @impl true
  def handle_info({:"$gen_event", {:xmlstreamelement, {:xmlel, tag, attrs, children}}}, state) do
    IO.puts(state, tag) # This only writes the release tag name as a vague gauge of how far we've come in the file
    {:noreply, state}
  end

  @impl true
  def handle_info(arg, state) do
    {:noreply, state}
  end
end

{:ok, handler} = GenServer.start_link(CSVHandler, "out.csv")
stream_parser = :fxml_stream.new(handler)

File.stream!("./discogs_20231001_releases.xml")
|> Enum.reduce(stream_parser, fn chunk, parser -> :fxml_stream.parse(parser, chunk) end)

Also just to do my due diligence-- I’d actually recommend against using my library that I posted above if you want to expand out this parser to hold more data. Looking at the data overall, it really seems like you need a home rolled solution, and on top of that my lib doesn’t support streaming out data.

2 Likes

Thanks a lot @ducharmemp ! I am trying to understand what you wrote and work out the solution based on that.

I think what I would like do is this:

  • read the file line by line (this is fine)
  • when i see the opening tag that i am interested in (<release>) start to build a state
  • keep adding to this state until I see the closing tag (</release>)
  • handling the closing tag does two things:
    • outputting the data I am interested in (maybe: id, label_id)
    • removing the state

This would limit the memory usage to one document which is a few hundred kilobytes tops.

I think Saxy might be able to this with the functions in the handler but not sure exactly how. I am using Saxy in exs so it is a bit more “interesting” how to exactly achieve this.

1 Like

Have you solved this? I might have some free time soon and can try and help you. I worked with XML files many times and :xmerl never failed me so far. Admittedly it requires a bit more careful coding but the end result is very worth it.

@dimitarvp I solved it in Python. I could not solve it with Elixir / Erlang. If you have time we could do a bit of pair programming.

Super busy lately but I love these problems so let me ping you on DM when I get a free hour or two.