Splitting/writing a stream to multiple files based on key in a map?

I have a single file data dump (one JSON object per line) that I need to split into multiple files based on a key in the source file.

I’ve written it in python and just dynamically open file handles and store them in a map/dict as I process the file. After iterating over the whole file, I can iterate over my map/dict of handles and write/close them.

I’m trying to do this in Elixir and not sure how to go about this. I was thinking of storing the collected events in a map, but can’t as there is no state. Doing a file append for every line is very inefficient and slow.

Here’s my file append code

stream = File.stream!(filename, [:read, :compressed], :line)

stream
|> Stream.each(fn line ->
  event = Poison.decode!(line)
  File.write Map.get(event, "eventType"), line, [:append]      
  
end)
|> Stream.run()

Maybe you could use Enum.reduce with functions from IO module?

file_descriptors = # open files with File.open!(filename, [:append]) or open them in the Enum.reduce below if needed

file_descriptors = 
  filename
  |> File.stream!([:read, :compressed], :line)
  |> Enum.reduce(file_descriptors, fn line, file_descriptors ->
    # event = Poison.decode!(line) # how do you expect to write a map?
    event
    |> Map.get("eventType") # I suppose it would return a file descriptor (as from File.open)
    |> IO.binwrite(line)
    # do something with file_descriptors (add new fd or close some file) and return
    file_descriptors
  end)

# then close all files that are left hanging around
Enum.each(file_descriptors, fn fd -> File.close!(fd) end)
1 Like