diff --git a/lib/async/io/stream.rb b/lib/async/io/stream.rb index baeebc8..56a35e0 100644 --- a/lib/async/io/stream.rb +++ b/lib/async/io/stream.rb @@ -197,6 +197,10 @@ def connected? @io.connected? end + def readable? + @io.readable? + end + def closed? @io.closed? end @@ -246,6 +250,21 @@ def eof! private + def sysread(size, buffer) + while true + result = @io.read_nonblock(size, buffer, exception: false) + + case result + when :wait_readable + @io.wait_readable + when :wait_writable + @io.wait_writable + else + return result + end + end + end + # Fills the buffer from the underlying stream. def fill_read_buffer(size = @block_size) # We impose a limit because the underlying `read` system call can fail if we request too much data in one go. @@ -257,12 +276,12 @@ def fill_read_buffer(size = @block_size) flush if @read_buffer.empty? - if @io.read_nonblock(size, @read_buffer, exception: false) + if sysread(size, @read_buffer) # Console.logger.debug(self, name: "read") {@read_buffer.inspect} return true end else - if chunk = @io.read_nonblock(size, @input_buffer, exception: false) + if chunk = sysread(size, @input_buffer) @read_buffer << chunk # Console.logger.debug(self, name: "read") {@read_buffer.inspect} diff --git a/spec/async/io/stream_spec.rb b/spec/async/io/stream_spec.rb index 9b5f18c..b19bfdc 100644 --- a/spec/async/io/stream_spec.rb +++ b/spec/async/io/stream_spec.rb @@ -23,6 +23,26 @@ end end + context "native I/O", if: RUBY_VERSION >= "3.1" do + let(:sockets) do + @sockets = ::Socket.pair(::Socket::AF_UNIX, ::Socket::SOCK_STREAM) + end + + after do + @sockets.each(&:close) + end + + let(:io) {sockets.first} + subject {described_class.new(sockets.last)} + + it "can read data" do + io.write("Hello World") + io.close_write + + expect(subject.read).to be == "Hello World" + end + end + context "socket I/O" do let(:sockets) do @sockets = Async::IO::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM)