Streaming by chunks

I want to stream data from a file to JSON lines into an downstream system (ELK). To do that, ideally, I can use Tesla whose put/4 function accepts a stream for the body and write something like this:

stream =
  File.stream!("input_file.ext", [:read])
  |> Stream.map(&some_transform_function/1)

client = Tesla.Client([Tesla.Middleware.JSON])

Tesla.put(client, "http://some_elk_instance:9200/_bulk", stream)

and it works because both Tesla.Middleware.JSON and the used Tesla adapters support streams, and JSON lines are sent on the wire. Note that they do not support any enumerable, just streams.

However, I’m working with input files of several gigabytes, and I’m reaching the limits of what the server can handle (it ends up with OOM Kill on ELK side but there are other limits as well).

What I’d like to do is to chunk the initial streams to a list of other “smaller” streams, each part being a part of the original file. The Tesla API do not accepts any enumerable, so Stream.chunk_every/2 doesn’t work (this is what I’m doing now, but I then have to reconstruct the JSON line request body into a big binary, and it takes time and consumes a lot of memory).

I’d also like to process the file only once. As far as I understand, calling something like

[Stream.drop(file_stream, 0), Stream.drop(file_stream, 100_000),  Stream.drop(file_stream, 200_000),  Stream.drop(file_stream, 300_000),  Stream.drop(file_stream, 400_000),  ...]
|> Enum.map(&Stream.take(&1, 100_000))
|> Enum.map(&Tesla.put(client, "http://some_elk_instance:9200/_bulk"), &1))
|> Enum.map(&Stream.run/1)

would result, if I understand streams correctly, in reading the file everytime from the start when the streams are run, and I don’t know the number of lines in advance anyway (it would require reading the file one more time).

I think it makes sense for Tesla to not support any enumerable because, for example, a map implements this protocol and it wouldn’t make sense to stream the key / value pairs of a map (using a JSON middleware, you just want to convert it to JSON)

So I can’t come up with an elegant solution that does not require loading a chunk of the input file’s lines into memory before sending it using the Tesla API. Can you think of a solution or a better method?

Maybe File.open/2 and IO.binstream/2 would let you stream successive chunks from the file?

Thanks for your answer! I realize I forgot to say that the input file is a text file whose entries are separated by new lines (JSONL or CSV).

Thus, using IO.binstream/2, reading line by line would result in chunks of 1 line (while what I want is to send, say, 5000 lines at once using streams) while reading chunks would, as far as I understand, result in getting a binary and not a stream, and therefore should be stored in memory before further processing, couldn’t be used with the Tesla API as-is. Plus I’d have to handle cases when a line is cut in the middle.

The more I think about it, the more I’m inclined to think that Streams just don’t work like that and there is not an easy trick to achieve what I want to do :slight_smile: