Stream file text and parse each line information where lines are not always independent

Hi there.

For a matter of studying and practice reasons I have the following problem:

Given I have a log txt file (usually large, so that I’m using File.stream), I need to parse some lines until a pattern is found.

Example txt file:

[<info_1>] [<info_2>]: Some text information
[<info_1>] [<info_4>] : Another text information

At this point, ok, I can parse each line and extracts the information I want separately. In my solution I built a struct where I extract each one of these information and add them to that struct, like

%Info {
   origin: "<info_1>"
   something: "<info_2>"
   message: "Another text information"
},

%Info{
   origin: "<info_1>"
   something: "<info_4>"
   message: "Some text information"
}

But sometimes the log file comes like following:

[<info_1>] [<info_2>]: Some text information
[<info_1>] [<info_4>] : Another text information
but split multiline 
like this
[<info_1>] [<info_5>]: Other formatted information

At first, for those 2 lines I was not able to extract all informations I wanted, because there is no prefix. Then I used Stream.scan/3 where I could accumulate the last parsed line to the next iteration and use that to fill that information, eg., when the iteration reached but split multiline, the previous parsed data would be [<info_1>] [<info_4>] : Another text information and I would have the %Info{} all filled and could used that data to fill the missed fields.
And when the iteration reached the line like this, the previous parsed data would be:

%Info{
   origin: "<info_1>"
   something: "<info_4>"
   message: "but split multiline "
}}

because I filled the fields and it worked.

The final code was something like:

file_name
    |> File.stream!()
    |> Stream.map(&String.trim/1)
    |> Stream.filter(&reject_empty_lines/1)
    |> Stream.scan(%Info{}, &store_info(Parser.parse(&1), &2))

where &1 is the current line end &2 is the previous parsed data %Info{}.

I don’t know if I explained this clearly.

The problem with this solution is that now I wanted to make this run in parallel computations using Flow but I can’t, because of these cases where there are multilines information where I can’t get previous line information (and it is not under my control the way this is formatted).

I forget to mention that I store this data in each iteration.

At the moment, my solution using Flow is saving data with empty fields, but I set this in a separated list under a hash.

    file_name
    |> File.stream!()
    |> Flow.from_enumerable()
    |> Flow.map(&String.trim/1)
    |> Flow.filter(&reject_empty_lines/1)
    |> Flow.partition()
    |> Flow.reduce(fn -> 0 end, fn line, acc ->
      info = Parser.parse(line)
      origin = info.origin || "unmapped"
      
      #just to show I save this in a key-value 
      store_info(origin, info)
    end)

But I am not able to relate this “unmapped” information with any of those that is “mapped” (with all fields filled).

Could you give some suggestion? I thought about something like “read line and get next line until I find the pattern”, but I don’t think it is possible.

Any tips? eheh

Thank you :slight_smile:

Wouldn’t Stream.chunk_while be a solution to merge multi line logs together?

"""
[1] Test
2
3
[2] Ok
[3] Ok
[4] Test
2
3
"""
|> String.split("\n")
|> Stream.chunk_while(
  [],
  fn element, acc ->
    case element do
      <<"[", rest::binary>> ->
        case acc do
          [] -> {:cont, [element]}
          _ -> {:cont, acc |> Enum.reverse() |> Enum.join("\n"), [element]}
        end

      _ ->
        {:cont, [element | acc]}
    end
  end,
  fn
    [] -> {:cont, []}
    acc -> {:cont, acc |> Enum.reverse() |> Enum.join("\n"), []}
  end
)
|> Enum.into([])
|> IO.inspect()
# [
#   "[1] Test\n2\n3", 
#   "[2] Ok", 
#   "[3] Ok", 
#   "[4] Test\n2\n3\n"
# ]