How do I read available characters?

While working on the ICFP Contest this weekend, I struggled to find a good way to periodically read content not delimited by newlines. Let me show some examples of what I mean.

Here’s a Ruby script that produces messages:

$stdout.sync = true
loop do
  message = rand(1_000_000)
  $stdout.write "<#{message}>"
  sleep rand(3) + 1
end

I can think of multiple ways, using Ruby, to read these messages as they come in. For example, I can use non-blocking I/O:

loop do
  begin
    raw = $stdin.readpartial(1_024)
    puts raw[/\d+/]
  rescue EOFError
    sleep 0.1
  end
end

Or I can read what’s available:

require "io/wait"

loop do
  $stdin.wait_readable
  raw = $stdin.read($stdin.nread)
  puts raw[/\d+/]
end

There are other options too, like using IO::select(). Here’s how the above examples work in practice:

$ ruby producer.rb | ruby read_nonblocking.rb 
319187
122221
30420
…
$ ruby producer.rb | ruby read_ready.rb 
640243
971582
366808
…

I haven’t found a good way to do similar work with Elixir. The best I’ve come up with for the same input is to read character by character:

defmodule MessageReader do
  def read_message(device, buffer \\ "") do
    new_buffer = buffer <> IO.read(device, 1)
    if String.first(new_buffer) == "<" and String.last(new_buffer) == ">" do
      String.slice(new_buffer, 1..-2)
    else
      read_message(device, new_buffer)
    end
  end

  def read_messages(device, handler) do
    read_message(device)
    |> handler.()
    read_messages(device, handler)
  end
end

MessageReader.read_messages(:stdio, &IO.puts/1)

This does work:

$ ruby producer.rb | elixir read_chars.exs  
963609
378034
387827
…

However, that would be pretty inefficient with long messages and I can’t find a way to read ahead. Am I missing a useful trick?

Thanks in advance!

Have you took a look at IO.stream/2?

:stdin
|> IO.stream(1_024)
|> Enum.each(&IO.puts/1)

Should be roughly equivalent to your first ruby version.

Sorry, but I don’t believe it is. I think your code blocks until it can deliver 1,024 bytes, so you don’t receive messages as they come in. I tried to run it to verify my assumptions (after I fixed the :stdin to :stdio bug) and it did seem to be the case.

1 Like

That’s what I expect your ruby code to do as well.

But perhaps you can take a look into the implementation to learn to implement something that behaves as you expect?

That’s not what happens, no. Both of my Ruby examples print the messages as they arrive.

I don’t think that’s helpful in this case. Under the hood they make I/O function calls in C. I’m looking for a means to accomplish similar tasks on the BEAM.

I meant the implementation of my elixir code

Ah, sorry for misunderstanding.

IO.Stream itself seems limited by the issues I found with your code. Internally, it delegates to two functions that read by fixed chunks. This is exactly what I’m trying to avoid.

Good idea though. Thanks.

In general I’m not very keen on messages over stdin or network or any other mean, that are neither delimited nor size constraint.

If for some reason the pipe hickups in your ruby version or the generator does ill flushing, then the consumer might receive <123><12 and then 3>. This can get even worse when there is a network inbetween with indeterministic routing and changing latency.

Any messaging protocol should either mark start and end of a message explicit or provide some kind of “header” which specifies the length. I do prefer the latter.

Anyway, I’d like as well if I could already start to parse the first couple of bytes while waiting for the tail, therefore I totally understand the demand for a function which returns immediately at most N bytes from an io-device.

1 Like

If you really want to play with what is under the hood, the IO message protocol in Erlang does provide a get_until functionality: http://erlang.org/doc/apps/stdlib/io_protocol.html

The Erlang io:read/2 function uses it to parse terms out of the standard input:

read(Io, Prompt) ->
    case request(Io, {get_until,unicode,Prompt,erl_scan,tokens,[1]}) of
	{ok,Toks,_EndLine} ->
	    erl_parse:parse_term(Toks);
%	{error, Reason} when atom(Reason) ->
%	    erlang:error(conv_reason(read, Reason), [Io, Prompt]);
	{error,E,_EndLine} ->
	    {error,E};
	{eof,_EndLine} ->
	    eof;
	Other ->
	    Other
    end.

Dave Thomas has recently asked for a similar feature and we would be glad to expose it in the IO module API.

4 Likes

Of course, you’re right. However, going back to my original message, I was trying to write a program for a protocol not under my control. Also, in their defense, their messages were prefixed with a size header.

1 Like

I realized the contest and that the protocol was out of your control, still I wanted to have this said in the thread as a warning for people that want to create an unbound protocol…

Awesome. Thanks for the info!

Can you share what his specific request was?

https://github.com/elixir-lang/elixir/issues/6424

Although I would prefer to expose a more low-level API closer to what the protocol provides.

Interesting. Dave’s request for reading until a specific character would solve my example in this thread. I could read until encountering a ">".

How would you feel if I created a patch that expands what IO.binread/2 and IO.read/2 accept in line_or_chars? We could allow it to take a function for advanced needs. Optionally, we could easily pass a character or sequence to read to, if desired. Thoughts?

I would personally prefer to provide a low-level function first and then a higher level one. Otherwise folks will have to reach the low-level protocol every time they need something more complex than an expression.

I am also not sure we can support the until approach in binread/2 since it uses a different protocol iirc.

2 Likes

Seems that it was implemented 2 months ago right?