I want to stream data from a file to JSON lines into an downstream system (ELK). To do that, ideally, I can use Tesla whose put/4
function accepts a stream for the body and write something like this:
stream =
File.stream!("input_file.ext", [:read])
|> Stream.map(&some_transform_function/1)
client = Tesla.Client([Tesla.Middleware.JSON])
Tesla.put(client, "http://some_elk_instance:9200/_bulk", stream)
and it works because both Tesla.Middleware.JSON
and the used Tesla adapters support streams, and JSON lines are sent on the wire. Note that they do not support any enumerable, just streams.
However, I’m working with input files of several gigabytes, and I’m reaching the limits of what the server can handle (it ends up with OOM Kill on ELK side but there are other limits as well).
What I’d like to do is to chunk the initial streams to a list of other “smaller” streams, each part being a part of the original file. The Tesla API do not accepts any enumerable, so Stream.chunk_every/2
doesn’t work (this is what I’m doing now, but I then have to reconstruct the JSON line request body into a big binary, and it takes time and consumes a lot of memory).
I’d also like to process the file only once. As far as I understand, calling something like
[Stream.drop(file_stream, 0), Stream.drop(file_stream, 100_000), Stream.drop(file_stream, 200_000), Stream.drop(file_stream, 300_000), Stream.drop(file_stream, 400_000), ...]
|> Enum.map(&Stream.take(&1, 100_000))
|> Enum.map(&Tesla.put(client, "http://some_elk_instance:9200/_bulk"), &1))
|> Enum.map(&Stream.run/1)
would result, if I understand streams correctly, in reading the file everytime from the start when the streams are run, and I don’t know the number of lines in advance anyway (it would require reading the file one more time).
I think it makes sense for Tesla to not support any enumerable because, for example, a map implements this protocol and it wouldn’t make sense to stream the key / value pairs of a map (using a JSON middleware, you just want to convert it to JSON)
So I can’t come up with an elegant solution that does not require loading a chunk of the input file’s lines into memory before sending it using the Tesla API. Can you think of a solution or a better method?